From 02c9895670fc902a9a2aa8bb4da7e94277ba8225 Mon Sep 17 00:00:00 2001 From: 924060929 Date: Sun, 5 Jan 2025 00:13:16 +0800 Subject: [PATCH] fix --- .../plans/distribute/DistributePlanner.java | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java index e1ca4a9b2cc656a..c52209a8f4a3bfd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java @@ -192,14 +192,14 @@ private List filterInstancesWhichCanReceiveDataFromRemote( PipelineDistributedPlan receiverPlan, boolean enableShareHashTableForBroadcastJoin, ExchangeNode linkNode) { + if (linkNode.isSerialOperator()) { + return getFirstInstancePerWorker(receiverPlan.getInstanceJobs()); + } + boolean useLocalShuffle = receiverPlan.getInstanceJobs().stream() .anyMatch(LocalShuffleAssignedJob.class::isInstance); if (useLocalShuffle) { - if (linkNode.isSerialOperator()) { - return getFirstInstancePerWorker(receiverPlan.getInstanceJobs()); - } else { - return receiverPlan.getInstanceJobs(); - } + return getLocalShuffleRemoteReceiverJob(receiverPlan); } else if (enableShareHashTableForBroadcastJoin && linkNode.isRightChildOfBroadcastHashJoin()) { return getFirstInstancePerWorker(receiverPlan.getInstanceJobs()); } else { @@ -248,6 +248,26 @@ private List sortDestinationInstancesByBuckets( return Arrays.asList(instances); } + private List getLocalShuffleRemoteReceiverJob(PipelineDistributedPlan plan) { + List canReceiveDataFromRemote = Lists.newArrayListWithCapacity(plan.getInstanceJobs().size()); + boolean isFirst = true; + for (AssignedJob instanceJob : plan.getInstanceJobs()) { + LocalShuffleAssignedJob localShuffleJob = (LocalShuffleAssignedJob) instanceJob; + if (localShuffleJob instanceof LocalShuffleBucketJoinAssignedJob) { + LocalShuffleBucketJoinAssignedJob bucketJob = (LocalShuffleBucketJoinAssignedJob) localShuffleJob; + if (isFirst || !bucketJob.getAssignedJoinBucketIndexes().isEmpty()) { + canReceiveDataFromRemote.add(localShuffleJob); + } + } else { + if (isFirst || !instanceJob.getScanSource().isEmpty()) { + canReceiveDataFromRemote.add(localShuffleJob); + } + } + isFirst = false; + } + return canReceiveDataFromRemote; + } + private List getFirstInstancePerWorker(List instances) { Map firstInstancePerWorker = Maps.newLinkedHashMap(); for (AssignedJob instance : instances) {