-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Enhancement] support incremental scan ranges deployment at FE side #50189
[Enhancement] support incremental scan ranges deployment at FE side #50189
Conversation
Signed-off-by: yanz <[email protected]>
Signed-off-by: yanz <[email protected]>
...ore/src/main/java/com/starrocks/qe/scheduler/assignment/LocalFragmentAssignmentStrategy.java
Show resolved
Hide resolved
fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/PhasedExecutionSchedule.java
Show resolved
Hide resolved
Signed-off-by: yanz <[email protected]>
Signed-off-by: yanz <[email protected]>
Signed-off-by: yanz <[email protected]>
Quality Gate passedIssues Measures |
[BE Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
[Java-Extensions Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
[FE Incremental Coverage Report]✅ pass : 158 / 163 (96.93%) file detail
|
if you want to test, you can test this PR [Enhancement] support incremental scan ranges deployment at BE side by dirtysalt · Pull Request #50254 · StarRocks/starrocks there need to be some modification on BE side too. |
Could you do not send empty scan range to all BE, maybe we can send to the incremental scan range to the worker which already has fragment instance before? |
How do you deal with the incremental partition in hiveTable toThfift |
Don't handle it in this PR. for hive scan node, there is no need to handle incremental partition. |
Yes. It will be more complicated, we can optimize in next PR. We can
anyway, it's just an optimization. |
I can not reply this comment. No problem, I can fix that. |
if (connectContext != null) { | ||
if (connectContext.getSessionVariable().isEnableConnectorIncrementalScanRanges()) { | ||
jobSpec.setIncrementalScanRanges(true); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to move to JobSpec::Builder::commonProperties
and not expose setIncrementalScanRanges
.
public Builder commonProperties(ConnectContext context) {
TWorkGroup newResourceGroup = prepareResourceGroup(
context, ResourceGroupClassifier.QueryType.fromTQueryType(instance.queryOptions.getQuery_type()));
this.resourceGroup(newResourceGroup);
this.enablePipeline(isEnablePipeline(context, instance.fragments));
instance.connectContext = context;
instance.enableQueue = isEnableQueue(context);
instance.needQueued = needCheckQueue();
instance.enableGroupLevelQueue = instance.enableQueue && GlobalVariable.isEnableGroupLevelQueryQueue();
+ instance.incrementalScanRanges = connectContextgetSessionVariable().isEnableConnectorIncrementalScanRanges();
return this;
}
fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/AllAtOnceExecutionSchedule.java
Show resolved
Hide resolved
Signed-off-by: yanz <[email protected]> Signed-off-by: zhiminr.ren <[email protected]>
Why I'm doing:
What I'm doing:
CoordinatorPreprocessor
Added
assignIncrementalScanRangesToFragmentInstances
interface, which recalculates the scan ranges of all fragment instances below a fragment.Currently the distribution of
node2ScanRanges
is supported, but more complex data distributions such asnode + driver_seq -> scan range
are not supported for incremental deployment.ExecutionSchedule
We currently support two scheduling methods: all-at-once and phased. The difference is that they schedule different sets of fragment instances.
The modification to the code is that after each collection of fragment instances is scheduled, it immediately checks to see if there are any scan nodes left in the fragment instances that have not yet been scheduled. If there are, the scheduling process will continue.
The method used for subsequent scheduling is
coordinator.assignIncrementalScanRangesToDeployStates(deployer, states);
Coordinator
Adds the
assignIncrementalScanRangesToDeployStates
interface, which checks the scan node for more scan ranges based on the deploy states (i.e., the set of locally dispatched fragment instances).node2ScanRanges
under fragment instances.Arch Diagram
Fixes #50196
What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check: