Skip to content

Commit

Permalink
branch-3.0: [fix](coordinator) Fix wrong bucket assignments by coordi…
Browse files Browse the repository at this point in the history
…nator #45365 (#45401)

Cherry-picked from #45365

Co-authored-by: Gabriel <[email protected]>
  • Loading branch information
github-actions[bot] and Gabriel39 authored Dec 13, 2024
1 parent 122d574 commit f53bb5d
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 24 deletions.
85 changes: 61 additions & 24 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -2734,7 +2734,8 @@ private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanc
* 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges.
* 2. Use Nereids planner.
*/
boolean ignoreStorageDataDistribution = params.fragment != null && params.fragment.useSerialSource(context);
boolean ignoreStorageDataDistribution = scanNodes != null && !scanNodes.isEmpty()
&& params.fragment != null && params.fragment.useSerialSource(context);

FragmentScanRangeAssignment assignment = params.scanRangeAssignment;
for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressScanRange
Expand All @@ -2744,33 +2745,69 @@ private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanc
= findOrInsert(assignment, addressScanRange.getKey(), new HashMap<>());

if (ignoreStorageDataDistribution) {
FInstanceExecParam instanceParam = new FInstanceExecParam(
null, addressScanRange.getKey(), 0, params);

for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : scanRange) {
for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange
: nodeScanRangeMap.second.entrySet()) {
if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
range.put(nodeScanRange.getKey(), Lists.newArrayList());
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), Lists.newArrayList());
List<List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> perInstanceScanRanges
= ListUtil.splitBySize(scanRange, parallelExecInstanceNum);
/**
* Split scan ranges evenly into `parallelExecInstanceNum` instances.
*
*
* For a fragment contains co-located join,
*
* scan (id = 0) -> join build (id = 2)
* |
* scan (id = 1) -> join probe (id = 2)
*
* If both of `scan (id = 0)` and `scan (id = 1)` are serial operators, we will plan local exchanger
* after them:
*
* scan (id = 0) -> local exchange -> join build (id = 2)
* |
* scan (id = 1) -> local exchange -> join probe (id = 2)
*
*
* And there is another more complicated scenario, for example, `scan (id = 0)` has 10 partitions and
* 3 buckets which means 3 * 10 tablets and `scan (id = 1)` has 3 buckets and no partition which means
* 3 tablets totally. If expected parallelism is 8, we will get a serial scan (id = 0) and a
* non-serial scan (id = 1). For this case, we will plan another plan with local exchange:
*
* scan (id = 0) -> local exchange -> join build (id = 2)
* |
* scan (id = 1) -> join probe (id = 2)
*/
FInstanceExecParam firstInstanceParam = null;
for (List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> perInstanceScanRange
: perInstanceScanRanges) {
FInstanceExecParam instanceParam = new FInstanceExecParam(
null, addressScanRange.getKey(), 0, params);

if (firstInstanceParam == null) {
firstInstanceParam = instanceParam;
}
for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : perInstanceScanRange) {
instanceParam.addBucketSeq(nodeScanRangeMap.first);
for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange
: nodeScanRangeMap.second.entrySet()) {
int scanId = nodeScanRange.getKey();
Optional<ScanNode> node = scanNodes.stream().filter(
scanNode -> scanNode.getId().asInt() == scanId).findFirst();
Preconditions.checkArgument(node.isPresent());
FInstanceExecParam instanceParamToScan = node.get().isSerialOperator()
? firstInstanceParam : instanceParam;
if (!instanceParamToScan.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
range.put(nodeScanRange.getKey(), Lists.newArrayList());
instanceParamToScan.perNodeScanRanges
.put(nodeScanRange.getKey(), Lists.newArrayList());
}
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
instanceParamToScan.perNodeScanRanges.get(nodeScanRange.getKey())
.addAll(nodeScanRange.getValue());
}
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
instanceParam.perNodeScanRanges.get(nodeScanRange.getKey())
.addAll(nodeScanRange.getValue());
}
params.instanceExecParams.add(instanceParam);
}
List<FInstanceExecParam> instanceExecParams = new ArrayList<>();
instanceExecParams.add(instanceParam);
for (int i = 1; i < parallelExecInstanceNum; i++) {
instanceExecParams.add(new FInstanceExecParam(
null, addressScanRange.getKey(), 0, params));
}
int index = 0;
for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : scanRange) {
instanceExecParams.get(index % instanceExecParams.size()).addBucketSeq(nodeScanRangeMap.first);
index++;
for (int i = perInstanceScanRanges.size(); i < parallelExecInstanceNum; i++) {
params.instanceExecParams.add(new FInstanceExecParam(null, addressScanRange.getKey(), 0, params));
}
params.instanceExecParams.addAll(instanceExecParams);
} else {
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
10000007 10000007 48414 10000007
10000007 10000007 48414 10000007
10000007 10000007 48414 10000007
10000007 10000007 60426 10000007
10000007 10000007 60426 10000007
10000007 10000007 60426 10000007
10000007 10000007 94460 10000007
10000007 10000007 94460 10000007
10000007 10000007 94460 10000007

Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_colocate_join_with_different_tablets") {
sql """
DROP TABLE IF EXISTS `USR_V_KHZHSJ_ES_POC1`;
DROP TABLE IF EXISTS `USR_TLBL_VAL_R1`;
CREATE TABLE `USR_V_KHZHSJ_ES_POC1` (
`khid` bigint NULL,
`khh` bigint NULL
) ENGINE=OLAP
DUPLICATE KEY(`khid`, `khh`)
DISTRIBUTED BY HASH(`khid`) BUCKETS 16
PROPERTIES (
"colocate_with" = "test_colocate_join_with_different_tabletsgroup1",
"replication_allocation" = "tag.location.default: 1"
);
CREATE TABLE `USR_TLBL_VAL_R1` (
`lbl_id` bigint NOT NULL COMMENT "标签ID",
`khh` bigint NULL COMMENT "客户号"
) ENGINE=OLAP
DUPLICATE KEY(`lbl_id`, `khh`)
COMMENT '标签结果日表'
PARTITION BY LIST (`lbl_id`)
(PARTITION p0 VALUES IN ("0"),
PARTITION p1 VALUES IN ("1"),
PARTITION p29 VALUES IN ("29"),
PARTITION p35 VALUES IN ("35"),
PARTITION p57 VALUES IN ("57"),
PARTITION p352 VALUES IN ("352"),
PARTITION p402 VALUES IN ("402"),
PARTITION p523 VALUES IN ("523"),
PARTITION p2347 VALUES IN ("2347"),
PARTITION p10376 VALUES IN ("10376"),
PARTITION p42408 VALUES IN ("42408"),
PARTITION p44410 VALUES IN ("44410"),
PARTITION p48414 VALUES IN ("48414"),
PARTITION p50416 VALUES IN ("50416"),
PARTITION p52418 VALUES IN ("52418"),
PARTITION p56422 VALUES IN ("56422"),
PARTITION p60426 VALUES IN ("60426"),
PARTITION p64430 VALUES IN ("64430"),
PARTITION p66432 VALUES IN ("66432"),
PARTITION p70436 VALUES IN ("70436"),
PARTITION p72438 VALUES IN ("72438"),
PARTITION p74440 VALUES IN ("74440"),
PARTITION p78444 VALUES IN ("78444"),
PARTITION p84450 VALUES IN ("84450"),
PARTITION p86452 VALUES IN ("86452"),
PARTITION p88454 VALUES IN ("88454"),
PARTITION p90456 VALUES IN ("90456"),
PARTITION p92458 VALUES IN ("92458"),
PARTITION p94460 VALUES IN ("94460"),
PARTITION p96462 VALUES IN ("96462"),
PARTITION p98464 VALUES IN ("98464"),
PARTITION p100466 VALUES IN ("100466"),
PARTITION p102468 VALUES IN ("102468"),
PARTITION p104470 VALUES IN ("104470"),
PARTITION p106472 VALUES IN ("106472"),
PARTITION p108474 VALUES IN ("108474"),
PARTITION p110476 VALUES IN ("110476"),
PARTITION p112478 VALUES IN ("112478"),
PARTITION p114480 VALUES IN ("114480"),
PARTITION p122488 VALUES IN ("122488"),
PARTITION p124490 VALUES IN ("124490"),
PARTITION p126492 VALUES IN ("126492"),
PARTITION p130496 VALUES IN ("130496"),
PARTITION p134500 VALUES IN ("134500"),
PARTITION p150516 VALUES IN ("150516"),
PARTITION p154520 VALUES IN ("154520"),
PARTITION p158524 VALUES IN ("158524"),
PARTITION p158525 VALUES IN ("158525"),
PARTITION p1848141 VALUES IN ("1848141"),
PARTITION p1848161 VALUES IN ("1848161"),
PARTITION p1848177 VALUES IN ("1848177"),
PARTITION p1848197 VALUES IN ("1848197"),
PARTITION p1848218 VALUES IN ("1848218"))
DISTRIBUTED BY HASH(`khh`) BUCKETS 16
PROPERTIES (
"colocate_with" = "test_colocate_join_with_different_tabletsgroup1",
"replication_allocation" = "tag.location.default: 1"
);
insert into USR_V_KHZHSJ_ES_POC1 values(10000007, 10000007);
insert into USR_V_KHZHSJ_ES_POC1 values(10000007, 10000007);
insert into USR_V_KHZHSJ_ES_POC1 values(10000007, 10000007);
insert into USR_TLBL_VAL_R1 values(48414, 10000007);
insert into USR_TLBL_VAL_R1 values(94460, 10000007);
insert into USR_TLBL_VAL_R1 values(60426, 10000007);
"""
qt_sql """ select * from USR_V_KHZHSJ_ES_POC1 A,USR_TLBL_VAL_R1 B WHERE A.khid = B.khh order by lbl_id; """
}

0 comments on commit f53bb5d

Please sign in to comment.