Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] support incremental scan ranges deployment #50198

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/planner/HdfsScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,15 @@ private void setupCloudCredential() {

@Override
public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
return scanRangeSource.getAllOutputs();
if (maxScanRangeLength == 0) {
return scanRangeSource.getAllOutputs();
}
return scanRangeSource.getOutputs((int) maxScanRangeLength);
}

@Override
public boolean hasMoreScanRanges() {
return scanRangeSource.hasMoreOutput();
}

@Override
Expand Down Expand Up @@ -272,4 +280,9 @@ public boolean canUseRuntimeAdaptiveDop() {
protected boolean supportTopNRuntimeFilter() {
return true;
}

@Override
public boolean isIncrementalScanRangesSupported() {
return true;
}
}
8 changes: 8 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ public boolean isLocalNativeTable() {
return false;
}

public boolean isIncrementalScanRangesSupported() {
return false;
}

public boolean hasMoreScanRanges() {
return false;
}

/**
* cast expr to SlotDescriptor type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.starrocks.qe.scheduler.assignment.FragmentAssignmentStrategyFactory;
import com.starrocks.qe.scheduler.dag.ExecutionDAG;
import com.starrocks.qe.scheduler.dag.ExecutionFragment;
import com.starrocks.qe.scheduler.dag.FragmentInstance;
import com.starrocks.qe.scheduler.dag.JobSpec;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
Expand Down Expand Up @@ -258,6 +259,14 @@ void computeFragmentInstances() throws UserException {
executionDAG.finalizeDAG();
}

public void assignIncrementalScanRangesToFragmentInstances(ExecutionFragment execFragment) throws UserException {
execFragment.getScanRangeAssignment().clear();
for (FragmentInstance instance : execFragment.getInstances()) {
instance.getNode2ScanRanges().clear();
}
fragmentAssignmentStrategyFactory.create(execFragment, workerProvider).assignFragmentToWorker(execFragment);
}

private void validateExecutionDAG() throws StarRocksPlannerException {
for (ExecutionFragment execFragment : executionDAG.getFragmentsInPreorder()) {
DataSink sink = execFragment.getPlanFragment().getSink();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.starrocks.datacache.DataCacheSelectMetrics;
import com.starrocks.mysql.MysqlCommand;
import com.starrocks.planner.PlanFragment;
import com.starrocks.planner.PlanFragmentId;
import com.starrocks.planner.ResultSink;
import com.starrocks.planner.RuntimeFilterDescription;
import com.starrocks.planner.ScanNode;
Expand All @@ -74,6 +75,7 @@
import com.starrocks.qe.scheduler.dag.FragmentInstanceExecState;
import com.starrocks.qe.scheduler.dag.JobSpec;
import com.starrocks.qe.scheduler.dag.PhasedExecutionSchedule;
import com.starrocks.qe.scheduler.slot.DeployState;
import com.starrocks.qe.scheduler.slot.LogicalSlot;
import com.starrocks.rpc.RpcException;
import com.starrocks.server.GlobalStateMgr;
Expand All @@ -82,6 +84,7 @@
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.system.ComputeNode;
import com.starrocks.thrift.TDescriptorTable;
import com.starrocks.thrift.TExecPlanFragmentParams;
import com.starrocks.thrift.TLoadJobType;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.thrift.TQueryType;
Expand All @@ -101,6 +104,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -478,6 +482,12 @@ public void prepareExec() throws Exception {
jobSpec.getFragments().forEach(fragment -> fragment.limitMaxPipelineDop(slot.getPipelineDop()));
}

if (connectContext != null) {
if (connectContext.getSessionVariable().isEnableConnectorIncrementalScanRanges()) {
jobSpec.setIncrementalScanRanges(true);
}
}

coordinatorPreprocessor.prepareExec();

prepareResultSink();
Expand Down Expand Up @@ -636,14 +646,69 @@ private void deliverExecFragments(boolean needDeploy) throws RpcException, UserE
Deployer deployer =
new Deployer(connectContext, jobSpec, executionDAG, coordinatorPreprocessor.getCoordAddress(),
this::handleErrorExecution, needDeploy);
schedule.prepareSchedule(deployer, executionDAG);
schedule.prepareSchedule(this, deployer, executionDAG);
this.schedule.schedule();
queryProfile.attachExecutionProfiles(executionDAG.getExecutions());
} finally {
unlock();
}
}

@Override
public List<DeployState> assignIncrementalScanRangesToDeployStates(Deployer deployer, List<DeployState> deployStates)
throws UserException {
List<DeployState> updatedStates = new ArrayList<>();
if (!jobSpec.isIncrementalScanRanges()) {
return updatedStates;
}
for (DeployState state : deployStates) {

Set<PlanFragmentId> planFragmentIds = new HashSet<>();
for (List<FragmentInstanceExecState> fragmentInstanceExecStates : state.getThreeStageExecutionsToDeploy()) {
for (FragmentInstanceExecState execState : fragmentInstanceExecStates) {
planFragmentIds.add(execState.getFragmentId());
}
}

Set<PlanFragmentId> updatedPlanFragmentIds = new HashSet<>();
for (PlanFragmentId fragmentId : planFragmentIds) {
boolean hasMoreScanRanges = false;
ExecutionFragment fragment = executionDAG.getFragment(fragmentId);
for (ScanNode scanNode : fragment.getScanNodes()) {
if (scanNode.hasMoreScanRanges()) {
hasMoreScanRanges = true;
}
}
if (hasMoreScanRanges) {
coordinatorPreprocessor.assignIncrementalScanRangesToFragmentInstances(fragment);
updatedPlanFragmentIds.add(fragmentId);
}
}

if (updatedPlanFragmentIds.isEmpty()) {
continue;
}

DeployState newState = new DeployState();
updatedStates.add(newState);
int index = 0;
for (List<FragmentInstanceExecState> fragmentInstanceExecStates : state.getThreeStageExecutionsToDeploy()) {
List<FragmentInstanceExecState> res = newState.getThreeStageExecutionsToDeploy().get(index);
index += 1;
for (FragmentInstanceExecState execState : fragmentInstanceExecStates) {
if (!updatedPlanFragmentIds.contains(execState.getFragmentId())) {
continue;
}
FragmentInstance instance = execState.getFragmentInstance();
TExecPlanFragmentParams request = deployer.createIncrementalScanRangesRequest(instance);
execState.setRequestToDeploy(request);
res.add(execState);
}
}
}
return updatedStates;
}

private void handleErrorExecution(Status status, FragmentInstanceExecState execution, Throwable failure)
throws UserException, RpcException {
cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
The assignIncrementalScanRangesToDeployStates method could produce IndexOutOfBoundsException.

You can modify the code like this:

@Override
public List<DeployState> assignIncrementalScanRangesToDeployStates(Deployer deployer, List<DeployState> deployStates)
        throws UserException {
    List<DeployState> updatedStates = new ArrayList<>();
    if (!jobSpec.isIncrementalScanRanges()) {
        return updatedStates;
    }
    for (DeployState state : deployStates) {

        Set<PlanFragmentId> planFragmentIds = new HashSet<>();
        for (List<FragmentInstanceExecState> fragmentInstanceExecStates : state.getThreeStageExecutionsToDeploy()) {
            for (FragmentInstanceExecState execState : fragmentInstanceExecStates) {
                planFragmentIds.add(execState.getFragmentId());
            }
        }

        Set<PlanFragmentId> updatedPlanFragmentIds = new HashSet<>();
        for (PlanFragmentId fragmentId : planFragmentIds) {
            boolean hasMoreScanRanges = false;
            ExecutionFragment fragment = executionDAG.getFragment(fragmentId);
            for (ScanNode scanNode : fragment.getScanNodes()) {
                if (scanNode.hasMoreScanRanges()) {
                    hasMoreScanRanges = true;
                }
            }
            if (hasMoreScanRanges) {
                coordinatorPreprocessor.assignIncrementalScanRangesToFragmentInstances(fragment);
                updatedPlanFragmentIds.add(fragmentId);
            }
        }

        if (updatedPlanFragmentIds.isEmpty()) {
            continue;
        }

        DeployState newState = new DeployState();
        updatedStates.add(newState);
        int index = 0;
        for (List<FragmentInstanceExecState> fragmentInstanceExecStates : state.getThreeStageExecutionsToDeploy()) {
            // Ensure there are enough elements
            while (newState.getThreeStageExecutionsToDeploy().size() <= index) {
                newState.getThreeStageExecutionsToDeploy().add(new ArrayList<>());
            }
            List<FragmentInstanceExecState> res = newState.getThreeStageExecutionsToDeploy().get(index);
            index += 1;
            for (FragmentInstanceExecState execState : fragmentInstanceExecStates) {
                if (!updatedPlanFragmentIds.contains(execState.getFragmentId())) {
                    continue;
                }
                FragmentInstance instance = execState.getFragmentInstance();
                TExecPlanFragmentParams request = deployer.createIncrementalScanRangesRequest(instance);
                execState.setRequestToDeploy(request);
                res.add(execState);
            }
        }
    }
    return updatedStates;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,7 @@ public static MaterializedViewRewriteMode parse(String str) {

public static final String CONNECTOR_REMOTE_FILE_ASYNC_QUEUE_SIZE = "connector_remote_file_async_queue_size";
public static final String CONNECTOR_REMOTE_FILE_ASYNC_TASK_SIZE = "connector_remote_file_async_task_size";
public static final String ENABLE_CONNECTOR_INCREMENTAL_SCAN_RANGES = "enable_connector_incremental_scan_ranges";

public static final List<String> DEPRECATED_VARIABLES = ImmutableList.<String>builder()
.add(CODEGEN_LEVEL)
Expand Down Expand Up @@ -2127,6 +2128,9 @@ public SessionVariable setHiveTempStagingDir(String hiveTempStagingDir) {
@VarAttr(name = CONNECTOR_REMOTE_FILE_ASYNC_TASK_SIZE, flag = VariableMgr.INVISIBLE)
private int connectorRemoteFileAsyncTaskSize = 4;

@VarAttr(name = ENABLE_CONNECTOR_INCREMENTAL_SCAN_RANGES)
private boolean enableConnectorIncrementalScanRanges = false;

public SessionVariableConstants.ChooseInstancesMode getChooseExecuteInstancesMode() {
return Enums.getIfPresent(SessionVariableConstants.ChooseInstancesMode.class,
StringUtils.upperCase(chooseExecuteInstancesMode))
Expand Down Expand Up @@ -4114,6 +4118,10 @@ public int getConnectorRemoteFileAsyncTaskSize() {
return connectorRemoteFileAsyncTaskSize;
}

public boolean isEnableConnectorIncrementalScanRanges() {
return enableConnectorIncrementalScanRanges;
}

// Serialize to thrift object
// used for rest api
public TQueryOptions toThrift() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.starrocks.analysis.DescriptorTable;
import com.starrocks.common.Status;
import com.starrocks.common.UserException;
import com.starrocks.common.util.RuntimeProfile;
import com.starrocks.datacache.DataCacheSelectMetrics;
import com.starrocks.planner.PlanFragment;
Expand All @@ -26,6 +27,7 @@
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.QueryStatisticsItem;
import com.starrocks.qe.RowBatch;
import com.starrocks.qe.scheduler.slot.DeployState;
import com.starrocks.qe.scheduler.slot.LogicalSlot;
import com.starrocks.sql.LoadPlanner;
import com.starrocks.sql.plan.ExecPlan;
Expand Down Expand Up @@ -80,8 +82,8 @@
long warehouseId);

Coordinator createRefreshDictionaryCacheScheduler(ConnectContext context, TUniqueId queryId,
DescriptorTable descTable, List<PlanFragment> fragments,
List<ScanNode> scanNodes);
DescriptorTable descTable, List<PlanFragment> fragments,
List<ScanNode> scanNodes);
}

// ------------------------------------------------------------------------------------
Expand Down Expand Up @@ -129,6 +131,10 @@

public abstract void cancel(PPlanFragmentCancelReason reason, String message);

public List<DeployState> assignIncrementalScanRangesToDeployStates(Deployer deployer, List<DeployState> deployStates) throws UserException {

Check failure on line 134 in fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Coordinator.java

View workflow job for this annotation

GitHub Actions / FE Code Style Check

[checkstyle] reported by reviewdog 🐶 Line is longer than 130 characters (found 144). Raw Output: /github/workspace/./fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Coordinator.java:134:0: error: Line is longer than 130 characters (found 144). (com.puppycrawl.tools.checkstyle.checks.sizes.LineLengthCheck)
return List.of();
}

public abstract void onFinished();

public abstract LogicalSlot getSlot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ private void createFragmentInstanceExecStates(ExecutionFragment fragment,
fragment.getFragmentIndex(),
request,
instance.getWorker());
execution.setFragmentInstance(instance);

threeStageExecutionsToDeploy.get(stageIndex).add(execution);

Expand Down Expand Up @@ -250,5 +251,9 @@ private void waitForDeploymentCompletion(List<FragmentInstanceExecState> executi
failureHandler.apply(firstErrResult.getStatus(), firstErrExecution, firstErrResult.getFailure());
}
}

public TExecPlanFragmentParams createIncrementalScanRangesRequest(FragmentInstance instance) {
return tFragmentInstanceFactory.createIncrementalScanRanges(instance);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

public class TFragmentInstanceFactory {
Expand Down Expand Up @@ -90,6 +91,17 @@ public TExecPlanFragmentParams create(FragmentInstance instance,
return result;
}

public TExecPlanFragmentParams createIncrementalScanRanges(FragmentInstance instance) {
TExecPlanFragmentParams result = new TExecPlanFragmentParams();
result.setProtocol_version(InternalServiceVersion.V1);
result.setParams(new TPlanFragmentExecParams());
result.params.setQuery_id(jobSpec.getQueryId());
result.params.setFragment_instance_id(instance.getInstanceId());
result.params.setPer_node_scan_ranges(instance.getNode2ScanRanges());
result.params.setPer_exch_num_senders(new HashMap<>());
return result;
}

public void toThriftFromCommonParams(TExecPlanFragmentParams result,
ExecutionFragment execFragment,
TDescriptorTable descTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,22 @@ public static BackendSelector create(ScanNode scanNode,
ExecutionFragment execFragment,
WorkerProvider workerProvider,
ConnectContext connectContext,
Set<Integer> destReplicatedScanIds) {
Set<Integer> destReplicatedScanIds,
boolean useIncrementalScanRanges) {
SessionVariable sessionVariable = connectContext.getSessionVariable();
FragmentScanRangeAssignment assignment = execFragment.getScanRangeAssignment();

// The parameters of getScanRangeLocations may ignore, It doesn't take effect.
List<TScanRangeLocations> locations = scanNode.getScanRangeLocations(0);
int maxScanRangeLength = 0;
if (useIncrementalScanRanges && scanNode.isIncrementalScanRangesSupported()) {
maxScanRangeLength = sessionVariable.getConnectorRemoteFileAsyncTaskSize();
}

List<TScanRangeLocations> locations = scanNode.getScanRangeLocations(maxScanRangeLength);
if (locations == null) {
return new NoopBackendSelector();
}

SessionVariable sessionVariable = connectContext.getSessionVariable();
FragmentScanRangeAssignment assignment = execFragment.getScanRangeAssignment();

if (scanNode instanceof SchemaScanNode) {
return new NormalBackendSelector(scanNode, locations, assignment, workerProvider, false);
} else if (scanNode instanceof HdfsScanNode || scanNode instanceof IcebergScanNode ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public FragmentAssignmentStrategy create(ExecutionFragment execFragment, WorkerP
executionDAG.isGatherOutput(), random);
} else {
return new LocalFragmentAssignmentStrategy(connectContext, workerProvider, jobSpec.isEnablePipeline(),
jobSpec.isLoadType());
jobSpec.isLoadType(), jobSpec.isIncrementalScanRanges());
}
}
}
Loading
Loading