From 1aa60f5186d19c3cb01a6d9732306ecf6df94606 Mon Sep 17 00:00:00 2001 From: Mryange Date: Wed, 4 Dec 2024 10:15:54 +0800 Subject: [PATCH] [opt](exec) Use PASSTHROUGH to improve the concurrency of the ADAPTIVE_PASSTHROUGH SINK. (#44925) before ``` op -> local sink(1) -> local source (n) ``` now ``` op -> local passthrough(1) -> local passthrough(n) -> local sink(n) -> local source (n) ``` profile ``` Pipeline : 1(instance_num=3): AGGREGATION_SINK_OPERATOR (id=4 , nereids_id=255): CROSS_JOIN_OPERATOR (id=3 , nereids_id=245): LOCAL_EXCHANGE_OPERATOR (ADAPTIVE_PASSTHROUGH) (id=-5): Pipeline : 2(instance_num=3): LOCAL_EXCHANGE_SINK_OPERATOR (ADAPTIVE_PASSTHROUGH) (id=-5): LOCAL_EXCHANGE_OPERATOR (PASSTHROUGH) (id=-6): Pipeline : 3(instance_num=1): LOCAL_EXCHANGE_SINK_OPERATOR (PASSTHROUGH) (id=-6): OLAP_SCAN_OPERATOR (id=2. nereids_id=234. table name = nums1(nums1)): ``` --- be/src/pipeline/pipeline.h | 8 ++++++++ be/src/pipeline/pipeline_fragment_context.cpp | 10 +++++++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index b969186b178bf7..afbe6c77596432 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -73,6 +73,14 @@ class Pipeline : public std::enable_shared_from_this { return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE; } + // For HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, and ADAPTIVE_PASSTHROUGH, + // data is processed and shuffled on the sink. + // Compared to PASSTHROUGH, this is a relatively heavy operation. + static bool heavy_operations_on_the_sink(ExchangeType idx) { + return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE || + idx == ExchangeType::ADAPTIVE_PASSTHROUGH; + } + bool need_to_local_exchange(const DataDistribution target_data_distribution, const int idx) const; void init_data_distribution() { diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 0775ef3fb19826..e6f257f9da792a 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -814,7 +814,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( } case ExchangeType::ADAPTIVE_PASSTHROUGH: shared_state->exchanger = AdaptivePassthroughExchanger::create_unique( - cur_pipe->num_tasks(), _num_instances, + std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit ? cast_set( _runtime_state->query_options().local_exchange_free_blocks_limit) @@ -915,9 +915,13 @@ Status PipelineFragmentContext::_add_local_exchange( << " cur_pipe->operators().size(): " << cur_pipe->operators().size() << " new_pip->operators().size(): " << new_pip->operators().size(); - // Add passthrough local exchanger if necessary + // There are some local shuffles with relatively heavy operations on the sink. + // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck. + // Therefore, local passthrough is used to increase the concurrency of the sink. + // op -> local sink(1) -> local source (n) + // op -> local passthrough(1) -> local passthrough(n) -> local sink(n) -> local source (n) if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 && - Pipeline::is_hash_exchange(data_distribution.distribution_type)) { + Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) { RETURN_IF_ERROR(_add_local_exchange_impl( cast_set(new_pip->operators().size()), pool, new_pip, add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH),