Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
huangqixiang committed Nov 24, 2024
1 parent 3633967 commit d6bde80
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
7 changes: 4 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -2197,17 +2197,18 @@ private void computeScanRangeAssignmentByColocate(
int bucketNum = scanNode.getBucketNum();
scanNode.getFragment().setBucketNum(bucketNum);
}
Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());
BucketSeqToScanRange bucketSeqToScanRange = fragmentIdTobucketSeqToScanRangeMap.get(scanNode.getFragmentId());
Map<Integer, List<TScanRangeLocation>> preferBucketLocationMap = new HashMap<>();
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isEnableColocateQueryBalanceV2()) {
ScanRangeAssignmentColocate assignment = new ScanRangeAssignmentColocateV2();
for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) {
TScanRangeLocations locations = scanNode.bucketSeq2locations.get(bucketSeq).get(0);
preferBucketLocationMap.put(bucketSeq, locations.getLocations());
}
preferBucketLocationMap = assignment.computeScanRangeAssignmentByColocate(preferBucketLocationMap);
preferBucketLocationMap = assignment.computeScanRangeAssignmentByColocate(preferBucketLocationMap,
bucketSeqToAddress);
}
Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());
BucketSeqToScanRange bucketSeqToScanRange = fragmentIdTobucketSeqToScanRangeMap.get(scanNode.getFragmentId());
for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) {
//fill scanRangeParamsList
List<TScanRangeLocations> locations = scanNode.bucketSeq2locations.get(bucketSeq);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.qe;

import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TScanRangeLocation;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -137,7 +138,8 @@ public int compareTo(LocationAssignment other) {
public abstract Map<Integer, Location> getSelectedBucketLocation(Map<Location, LocationAssignment> locationMap);

public Map<Integer, List<TScanRangeLocation>> computeScanRangeAssignmentByColocate(
Map<Integer, List<TScanRangeLocation>> bucketLocationMap) {
Map<Integer, List<TScanRangeLocation>> bucketLocationMap,
Map<Integer, TNetworkAddress> bucketSeqToAddress) {
Map<Location, LocationAssignment> locationAssignmentMap = new HashMap<>();
for (Map.Entry<Integer, List<TScanRangeLocation>> entry : bucketLocationMap.entrySet()) {
Integer bucketSeq = entry.getKey();
Expand All @@ -146,7 +148,14 @@ public Map<Integer, List<TScanRangeLocation>> computeScanRangeAssignmentByColoca
Location location = new Location(scanRangeLocation.getBackendId());
LocationAssignment locationAssignment = locationAssignmentMap.computeIfAbsent(location,
k -> new LocationAssignment(location));
locationAssignment.unselectBuckets.add(bucketSeq);
TNetworkAddress address = bucketSeqToAddress.get(bucketSeq);
if (address != null) {
if (Objects.equals(address, scanRangeLocation.getServer())) {
locationAssignment.selectedBuckets.add(bucketSeq);
}
} else {
locationAssignment.unselectBuckets.add(bucketSeq);
}
}
}
Map<Integer, Location> selectedBucketLocation = getSelectedBucketLocation(locationAssignmentMap);
Expand Down

0 comments on commit d6bde80

Please sign in to comment.