diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 7880372e9e22841..21755d073bb43bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2197,6 +2197,8 @@ private void computeScanRangeAssignmentByColocate( int bucketNum = scanNode.getBucketNum(); scanNode.getFragment().setBucketNum(bucketNum); } + Map bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId()); + BucketSeqToScanRange bucketSeqToScanRange = fragmentIdTobucketSeqToScanRangeMap.get(scanNode.getFragmentId()); Map> preferBucketLocationMap = new HashMap<>(); if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isEnableColocateQueryBalanceV2()) { ScanRangeAssignmentColocate assignment = new ScanRangeAssignmentColocateV2(); @@ -2204,10 +2206,9 @@ private void computeScanRangeAssignmentByColocate( TScanRangeLocations locations = scanNode.bucketSeq2locations.get(bucketSeq).get(0); preferBucketLocationMap.put(bucketSeq, locations.getLocations()); } - preferBucketLocationMap = assignment.computeScanRangeAssignmentByColocate(preferBucketLocationMap); + preferBucketLocationMap = assignment.computeScanRangeAssignmentByColocate(preferBucketLocationMap, + bucketSeqToAddress); } - Map bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId()); - BucketSeqToScanRange bucketSeqToScanRange = fragmentIdTobucketSeqToScanRangeMap.get(scanNode.getFragmentId()); for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) { //fill scanRangeParamsList List locations = scanNode.bucketSeq2locations.get(bucketSeq); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ScanRangeAssignmentColocate.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ScanRangeAssignmentColocate.java index 5c70be020404c5a..6ae092ea583bfb2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ScanRangeAssignmentColocate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ScanRangeAssignmentColocate.java @@ -17,6 +17,7 @@ package org.apache.doris.qe; +import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TScanRangeLocation; import com.google.common.annotations.VisibleForTesting; @@ -137,7 +138,8 @@ public int compareTo(LocationAssignment other) { public abstract Map getSelectedBucketLocation(Map locationMap); public Map> computeScanRangeAssignmentByColocate( - Map> bucketLocationMap) { + Map> bucketLocationMap, + Map bucketSeqToAddress) { Map locationAssignmentMap = new HashMap<>(); for (Map.Entry> entry : bucketLocationMap.entrySet()) { Integer bucketSeq = entry.getKey(); @@ -146,7 +148,14 @@ public Map> computeScanRangeAssignmentByColoca Location location = new Location(scanRangeLocation.getBackendId()); LocationAssignment locationAssignment = locationAssignmentMap.computeIfAbsent(location, k -> new LocationAssignment(location)); - locationAssignment.unselectBuckets.add(bucketSeq); + TNetworkAddress address = bucketSeqToAddress.get(bucketSeq); + if (address != null) { + if (Objects.equals(address, scanRangeLocation.getServer())) { + locationAssignment.selectedBuckets.add(bucketSeq); + } + } else { + locationAssignment.unselectBuckets.add(bucketSeq); + } } } Map selectedBucketLocation = getSelectedBucketLocation(locationAssignmentMap);