Skip to content

Commit

Permalink
[opt](exec) Use PASSTHROUGH to improve the concurrency of the ADAPTIV…
Browse files Browse the repository at this point in the history
…E_PASSTHROUGH SINK. (#44925)

before
```
op -> local sink(1) -> local source (n)
```
now
```
op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
```

profile 
```
                  Pipeline  :  1(instance_num=3):
                      AGGREGATION_SINK_OPERATOR  (id=4  ,  nereids_id=255):
                          CROSS_JOIN_OPERATOR  (id=3  ,  nereids_id=245):
                              LOCAL_EXCHANGE_OPERATOR  (ADAPTIVE_PASSTHROUGH)  (id=-5):
                  Pipeline  :  2(instance_num=3):
                      LOCAL_EXCHANGE_SINK_OPERATOR  (ADAPTIVE_PASSTHROUGH)  (id=-5):
                          LOCAL_EXCHANGE_OPERATOR  (PASSTHROUGH)  (id=-6):
                  Pipeline  :  3(instance_num=1):
                      LOCAL_EXCHANGE_SINK_OPERATOR  (PASSTHROUGH)  (id=-6):
                          OLAP_SCAN_OPERATOR  (id=2.  nereids_id=234.  table  name  =  nums1(nums1)):
```
  • Loading branch information
Mryange authored Dec 4, 2024
1 parent b8dcac0 commit 1aa60f5
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
8 changes: 8 additions & 0 deletions be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE;
}

// For HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, and ADAPTIVE_PASSTHROUGH,
// data is processed and shuffled on the sink.
// Compared to PASSTHROUGH, this is a relatively heavy operation.
static bool heavy_operations_on_the_sink(ExchangeType idx) {
return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE ||
idx == ExchangeType::ADAPTIVE_PASSTHROUGH;
}

bool need_to_local_exchange(const DataDistribution target_data_distribution,
const int idx) const;
void init_data_distribution() {
Expand Down
10 changes: 7 additions & 3 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
}
case ExchangeType::ADAPTIVE_PASSTHROUGH:
shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
cur_pipe->num_tasks(), _num_instances,
std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
? cast_set<int>(
_runtime_state->query_options().local_exchange_free_blocks_limit)
Expand Down Expand Up @@ -915,9 +915,13 @@ Status PipelineFragmentContext::_add_local_exchange(
<< " cur_pipe->operators().size(): " << cur_pipe->operators().size()
<< " new_pip->operators().size(): " << new_pip->operators().size();

// Add passthrough local exchanger if necessary
// There are some local shuffles with relatively heavy operations on the sink.
// If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
// Therefore, local passthrough is used to increase the concurrency of the sink.
// op -> local sink(1) -> local source (n)
// op -> local passthrough(1) -> local passthrough(n) -> local sink(n) -> local source (n)
if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
Pipeline::is_hash_exchange(data_distribution.distribution_type)) {
Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
RETURN_IF_ERROR(_add_local_exchange_impl(
cast_set<int>(new_pip->operators().size()), pool, new_pip,
add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH),
Expand Down

0 comments on commit 1aa60f5

Please sign in to comment.