Skip to content

Commit

Permalink
branch-3.0: [opt](exec) Use PASSTHROUGH to improve the concurrency of…
Browse files Browse the repository at this point in the history
… the ADAPTIVE_PASSTHROUGH SINK. #44925 (#44964)

Cherry-picked from #44925

Co-authored-by: Mryange <[email protected]>
  • Loading branch information
github-actions[bot] and Mryange authored Dec 4, 2024
1 parent fff936b commit abff86a
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
8 changes: 8 additions & 0 deletions be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE;
}

// For HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, and ADAPTIVE_PASSTHROUGH,
// data is processed and shuffled on the sink.
// Compared to PASSTHROUGH, this is a relatively heavy operation.
static bool heavy_operations_on_the_sink(ExchangeType idx) {
return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE ||
idx == ExchangeType::ADAPTIVE_PASSTHROUGH;
}

bool need_to_local_exchange(const DataDistribution target_data_distribution,
const int idx) const;
void init_data_distribution() {
Expand Down
10 changes: 7 additions & 3 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
}
case ExchangeType::ADAPTIVE_PASSTHROUGH:
shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
cur_pipe->num_tasks(), _num_instances,
std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
? _runtime_state->query_options().local_exchange_free_blocks_limit
: 0);
Expand Down Expand Up @@ -907,9 +907,13 @@ Status PipelineFragmentContext::_add_local_exchange(
<< " cur_pipe->operators().size(): " << cur_pipe->operators().size()
<< " new_pip->operators().size(): " << new_pip->operators().size();

// Add passthrough local exchanger if necessary
// There are some local shuffles with relatively heavy operations on the sink.
// If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
// Therefore, local passthrough is used to increase the concurrency of the sink.
// op -> local sink(1) -> local source (n)
// op -> local passthrough(1) -> local passthrough(n) -> local sink(n) -> local source (n)
if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
Pipeline::is_hash_exchange(data_distribution.distribution_type)) {
Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
RETURN_IF_ERROR(_add_local_exchange_impl(
new_pip->operators().size(), pool, new_pip, add_pipeline(new_pip, pip_idx + 2),
DataDistribution(ExchangeType::PASSTHROUGH), do_local_exchange, num_buckets,
Expand Down

0 comments on commit abff86a

Please sign in to comment.