diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 619848110d4a33..061a62ea99b330 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -73,6 +73,14 @@ class Pipeline : public std::enable_shared_from_this { return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE; } + // For HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, and ADAPTIVE_PASSTHROUGH, + // data is processed and shuffled on the sink. + // Compared to PASSTHROUGH, this is a relatively heavy operation. + static bool heavy_operations_on_the_sink(ExchangeType idx) { + return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE || + idx == ExchangeType::ADAPTIVE_PASSTHROUGH; + } + bool need_to_local_exchange(const DataDistribution target_data_distribution, const int idx) const; void init_data_distribution() { diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 01c14f1ddb3a61..48f2d96597c9cb 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -807,7 +807,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( } case ExchangeType::ADAPTIVE_PASSTHROUGH: shared_state->exchanger = AdaptivePassthroughExchanger::create_unique( - cur_pipe->num_tasks(), _num_instances, + std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit ? _runtime_state->query_options().local_exchange_free_blocks_limit : 0); @@ -907,9 +907,13 @@ Status PipelineFragmentContext::_add_local_exchange( << " cur_pipe->operators().size(): " << cur_pipe->operators().size() << " new_pip->operators().size(): " << new_pip->operators().size(); - // Add passthrough local exchanger if necessary + // There are some local shuffles with relatively heavy operations on the sink. + // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck. + // Therefore, local passthrough is used to increase the concurrency of the sink. + // op -> local sink(1) -> local source (n) + // op -> local passthrough(1) -> local passthrough(n) -> local sink(n) -> local source (n) if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 && - Pipeline::is_hash_exchange(data_distribution.distribution_type)) { + Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) { RETURN_IF_ERROR(_add_local_exchange_impl( new_pip->operators().size(), pool, new_pip, add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH), do_local_exchange, num_buckets,