Skip to content

Commit

Permalink
[fix](local shuffle) Fix bucket local shuffle
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Nov 22, 2024
1 parent 55ebcb3 commit ba28f15
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 13 deletions.
7 changes: 4 additions & 3 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1250,7 +1250,8 @@ void IRuntimeFilter::update_state() {
// In pipelineX, runtime filters will be ready or timeout before open phase.
if (expected == RuntimeFilterState::NOT_READY) {
DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms);
COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_);
COUNTER_SET(_wait_timer,
int64_t((MonotonicMillis() - registration_time_) * NANOS_PER_MILLIS));
_rf_state_atomic = RuntimeFilterState::TIME_OUT;
}
}
Expand All @@ -1269,7 +1270,7 @@ PrimitiveType IRuntimeFilter::column_type() const {

void IRuntimeFilter::signal() {
DCHECK(is_consumer());
COUNTER_SET(_wait_timer, MonotonicMillis() - registration_time_);
COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - registration_time_) * NANOS_PER_MILLIS));
_rf_state_atomic.store(RuntimeFilterState::READY);
if (!_filter_timer.empty()) {
for (auto& timer : _filter_timer) {
Expand Down Expand Up @@ -1514,7 +1515,7 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) {
void IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t local_merge_time) {
_profile->add_info_string("RealRuntimeFilterType", to_string(_wrapper->get_real_type()));
_profile->add_info_string("LocalMergeTime",
std::to_string(local_merge_time / 1000000000.0) + " s");
std::to_string((double)local_merge_time / NANOS_PER_SEC) + " s");
}

std::string IRuntimeFilter::debug_string() const {
Expand Down
14 changes: 6 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1933,7 +1933,6 @@ protected void computeFragmentHosts() throws Exception {

FInstanceExecParam instanceParam = new FInstanceExecParam(null, key, 0, params);
instanceParam.perNodeScanRanges.put(planNodeId, scanRangeParams);
instanceParam.perNodeSharedScans.put(planNodeId, sharedScan);
params.instanceExecParams.add(instanceParam);
}
params.ignoreDataDistribution = sharedScan;
Expand Down Expand Up @@ -2755,13 +2754,11 @@ private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanc
null, addressScanRange.getKey(), 0, params);

for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : scanRange) {
instanceParam.addBucketSeq(nodeScanRangeMap.first);
for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange
: nodeScanRangeMap.second.entrySet()) {
if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
range.put(nodeScanRange.getKey(), Lists.newArrayList());
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), Lists.newArrayList());
instanceParam.perNodeSharedScans.put(nodeScanRange.getKey(), true);
}
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
instanceParam.perNodeScanRanges.get(nodeScanRange.getKey())
Expand All @@ -2773,6 +2770,12 @@ private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanc
params.instanceExecParams.add(new FInstanceExecParam(
null, addressScanRange.getKey(), 0, params));
}
int index = 0;
for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : scanRange) {
params.instanceExecParams.get(index % params.instanceExecParams.size())
.addBucketSeq(nodeScanRangeMap.first);
index++;
}
} else {
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
Expand Down Expand Up @@ -3106,10 +3109,8 @@ Map<TNetworkAddress, TPipelineFragmentParams> toThrift(int backendNum) {
for (int i = 0; i < instanceExecParams.size(); ++i) {
final FInstanceExecParam instanceExecParam = instanceExecParams.get(i);
Map<Integer, List<TScanRangeParams>> scanRanges = instanceExecParam.perNodeScanRanges;
Map<Integer, Boolean> perNodeSharedScans = instanceExecParam.perNodeSharedScans;
if (scanRanges == null) {
scanRanges = Maps.newHashMap();
perNodeSharedScans = Maps.newHashMap();
}
if (!res.containsKey(instanceExecParam.host)) {
TPipelineFragmentParams params = new TPipelineFragmentParams();
Expand Down Expand Up @@ -3137,7 +3138,6 @@ Map<TNetworkAddress, TPipelineFragmentParams> toThrift(int backendNum) {

params.setFileScanParams(fileScanRangeParamsMap);
params.setNumBuckets(fragment.getBucketNum());
params.setPerNodeSharedScans(perNodeSharedScans);
params.setTotalInstances(instanceExecParams.size());
if (ignoreDataDistribution) {
params.setParallelInstances(parallelTasksNum);
Expand All @@ -3162,7 +3162,6 @@ Map<TNetworkAddress, TPipelineFragmentParams> toThrift(int backendNum) {

localParams.setFragmentInstanceId(instanceExecParam.instanceId);
localParams.setPerNodeScanRanges(scanRanges);
localParams.setPerNodeSharedScans(perNodeSharedScans);
localParams.setSenderId(i);
localParams.setBackendNum(backendNum++);
localParams.setRuntimeFilterParams(new TRuntimeFilterParams());
Expand Down Expand Up @@ -3310,7 +3309,6 @@ static class FInstanceExecParam {
TUniqueId instanceId;
TNetworkAddress host;
Map<Integer, List<TScanRangeParams>> perNodeScanRanges = Maps.newHashMap();
Map<Integer, Boolean> perNodeSharedScans = Maps.newHashMap();

int perFragmentInstanceIdx;

Expand Down
4 changes: 2 additions & 2 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ struct TPipelineInstanceParams {
4: optional i32 sender_id
5: optional TRuntimeFilterParams runtime_filter_params
6: optional i32 backend_num
7: optional map<Types.TPlanNodeId, bool> per_node_shared_scans
7: optional map<Types.TPlanNodeId, bool> per_node_shared_scans // deprecated
8: optional list<i32> topn_filter_source_node_ids // deprecated after we set topn_filter_descs
9: optional list<PlanNodes.TTopnFilterDesc> topn_filter_descs
}
Expand Down Expand Up @@ -816,7 +816,7 @@ struct TPipelineFragmentParams {
33: optional i32 num_local_sink
34: optional i32 num_buckets
35: optional map<i32, i32> bucket_seq_to_instance_idx
36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans
36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans // deprecated
37: optional i32 parallel_instances
38: optional i32 total_instances
39: optional map<i32, i32> shuffle_idx_to_instance_idx
Expand Down

0 comments on commit ba28f15

Please sign in to comment.