Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Jan 4, 2025
1 parent f038699 commit 02c9895
Showing 1 changed file with 25 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,14 @@ private List<AssignedJob> filterInstancesWhichCanReceiveDataFromRemote(
PipelineDistributedPlan receiverPlan,
boolean enableShareHashTableForBroadcastJoin,
ExchangeNode linkNode) {
if (linkNode.isSerialOperator()) {
return getFirstInstancePerWorker(receiverPlan.getInstanceJobs());
}

boolean useLocalShuffle = receiverPlan.getInstanceJobs().stream()
.anyMatch(LocalShuffleAssignedJob.class::isInstance);
if (useLocalShuffle) {
if (linkNode.isSerialOperator()) {
return getFirstInstancePerWorker(receiverPlan.getInstanceJobs());
} else {
return receiverPlan.getInstanceJobs();
}
return getLocalShuffleRemoteReceiverJob(receiverPlan);
} else if (enableShareHashTableForBroadcastJoin && linkNode.isRightChildOfBroadcastHashJoin()) {
return getFirstInstancePerWorker(receiverPlan.getInstanceJobs());
} else {
Expand Down Expand Up @@ -248,6 +248,26 @@ private List<AssignedJob> sortDestinationInstancesByBuckets(
return Arrays.asList(instances);
}

private List<AssignedJob> getLocalShuffleRemoteReceiverJob(PipelineDistributedPlan plan) {
List<AssignedJob> canReceiveDataFromRemote = Lists.newArrayListWithCapacity(plan.getInstanceJobs().size());
boolean isFirst = true;
for (AssignedJob instanceJob : plan.getInstanceJobs()) {
LocalShuffleAssignedJob localShuffleJob = (LocalShuffleAssignedJob) instanceJob;
if (localShuffleJob instanceof LocalShuffleBucketJoinAssignedJob) {
LocalShuffleBucketJoinAssignedJob bucketJob = (LocalShuffleBucketJoinAssignedJob) localShuffleJob;
if (isFirst || !bucketJob.getAssignedJoinBucketIndexes().isEmpty()) {
canReceiveDataFromRemote.add(localShuffleJob);
}
} else {
if (isFirst || !instanceJob.getScanSource().isEmpty()) {
canReceiveDataFromRemote.add(localShuffleJob);
}
}
isFirst = false;
}
return canReceiveDataFromRemote;
}

private List<AssignedJob> getFirstInstancePerWorker(List<AssignedJob> instances) {
Map<DistributedPlanWorker, AssignedJob> firstInstancePerWorker = Maps.newLinkedHashMap();
for (AssignedJob instance : instances) {
Expand Down

0 comments on commit 02c9895

Please sign in to comment.