-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Refactor] support incremental scan ranges deployment #50198
Conversation
Signed-off-by: yanz <[email protected]>
Signed-off-by: yanz <[email protected]>
|
@@ -315,7 +328,7 @@ private boolean scheduleNext(List<List<ExecutionFragment>> scheduleFragments) { | |||
} | |||
|
|||
if (groups.size() != fragment.childrenSize()) { | |||
List<PackedExecutionFragment> fragments = Lists.newArrayList(); | |||
List<PackedExecutionFragment> fragments = Lists.newArrayList(); | |||
for (int i = fragment.childrenSize() - 1; i >= 0; i--) { | |||
final ExecutionFragment child = sequenceMap.get(fragment.getFragmentId()).getAt(i); | |||
final PlanFragmentId childFragmentId = child.getFragmentId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
Incorrect handling of while (true)
loop when deployState
becomes empty, leading to a potential infinite loop.
You can modify the code like this:
while (deployState != null && !deployState.isEmpty()) {
deployState = coordinator.assignIncrementalScanRangesToDeployStates(deployer, deployState);
for (DeployState state : deployState) {
deployer.deployFragments(state);
}
}
} else { | ||
instance = | ||
new FragmentInstance(workerProvider.getWorkerById(workerId), execFragment); | ||
} | ||
execFragment.addInstance(instance); | ||
|
||
if (!enableAssignScanRangesPerDriverSeq(fragment, scanRanges)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
List.of()
creates an immutable list, which may cause issues if later modifications are attempted on the scan ranges.
You can modify the code like this:
@@ -239,7 +245,20 @@ private void assignScanRangesToNormalFragmentInstancePerWorker(ExecutionFragment
final int parallelExecInstanceNum = fragment.getParallelExecNum();
final int pipelineDop = fragment.getPipelineDop();
- execFragment.getScanRangeAssignment().forEach((workerId, scanRangesPerWorker) -> {
+ FragmentScanRangeAssignment assignment = execFragment.getScanRangeAssignment();
+
+ if (useIncrementalScanRanges) {
+ for (ScanNode scanNode : execFragment.getScanNodes()) {
+ if (scanNode.isIncrementalScanRangesSupported()) {
+ // TODO(yan): put some scan range for ending.
+ for (ComputeNode computeNode : workerProvider.getAllWorkers()) {
+ assignment.putAll(computeNode.getId(), scanNode.getId().asInt(), new ArrayList<>());
+ }
+ }
+ }
+ }
+
+ assignment.forEach((workerId, scanRangesPerWorker) -> {
// 1. Handle normal scan node firstly
scanRangesPerWorker.forEach((scanId, scanRangesOfNode) -> {
if (replicatedScanIds.contains(scanId)) {
Replace List.of()
with new ArrayList<>()
to create a mutable list.
} | ||
return updatedStates; | ||
} | ||
|
||
private void handleErrorExecution(Status status, FragmentInstanceExecState execution, Throwable failure) | ||
throws UserException, RpcException { | ||
cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
The assignIncrementalScanRangesToDeployStates
method could produce IndexOutOfBoundsException
.
You can modify the code like this:
@Override
public List<DeployState> assignIncrementalScanRangesToDeployStates(Deployer deployer, List<DeployState> deployStates)
throws UserException {
List<DeployState> updatedStates = new ArrayList<>();
if (!jobSpec.isIncrementalScanRanges()) {
return updatedStates;
}
for (DeployState state : deployStates) {
Set<PlanFragmentId> planFragmentIds = new HashSet<>();
for (List<FragmentInstanceExecState> fragmentInstanceExecStates : state.getThreeStageExecutionsToDeploy()) {
for (FragmentInstanceExecState execState : fragmentInstanceExecStates) {
planFragmentIds.add(execState.getFragmentId());
}
}
Set<PlanFragmentId> updatedPlanFragmentIds = new HashSet<>();
for (PlanFragmentId fragmentId : planFragmentIds) {
boolean hasMoreScanRanges = false;
ExecutionFragment fragment = executionDAG.getFragment(fragmentId);
for (ScanNode scanNode : fragment.getScanNodes()) {
if (scanNode.hasMoreScanRanges()) {
hasMoreScanRanges = true;
}
}
if (hasMoreScanRanges) {
coordinatorPreprocessor.assignIncrementalScanRangesToFragmentInstances(fragment);
updatedPlanFragmentIds.add(fragmentId);
}
}
if (updatedPlanFragmentIds.isEmpty()) {
continue;
}
DeployState newState = new DeployState();
updatedStates.add(newState);
int index = 0;
for (List<FragmentInstanceExecState> fragmentInstanceExecStates : state.getThreeStageExecutionsToDeploy()) {
// Ensure there are enough elements
while (newState.getThreeStageExecutionsToDeploy().size() <= index) {
newState.getThreeStageExecutionsToDeploy().add(new ArrayList<>());
}
List<FragmentInstanceExecState> res = newState.getThreeStageExecutionsToDeploy().get(index);
index += 1;
for (FragmentInstanceExecState execState : fragmentInstanceExecStates) {
if (!updatedPlanFragmentIds.contains(execState.getFragmentId())) {
continue;
}
FragmentInstance instance = execState.getFragmentInstance();
TExecPlanFragmentParams request = deployer.createIncrementalScanRangesRequest(instance);
execState.setRequestToDeploy(request);
res.add(execState);
}
}
}
return updatedStates;
}
Why I'm doing:
What I'm doing:
Fixes #50196
What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check: