diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index e7fed76be8fa16..cc789f6e25b20b 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -32,7 +32,6 @@ #include "pipeline/exec/operator.h" #include "pipeline/exec/sort_source_operator.h" #include "pipeline/local_exchange/local_exchange_sink_operator.h" -#include "pipeline/local_exchange/local_exchange_source_operator.h" #include "pipeline/pipeline_fragment_context.h" #include "util/runtime_profile.h" #include "util/uid_util.h" @@ -279,6 +278,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( _tablet_sink_txn_id(sink.tablet_sink_txn_id), _t_tablet_sink_exprs(&sink.tablet_sink_exprs), _enable_local_merge_sort(state->enable_local_merge_sort()), + _dest_is_merge(sink.__isset.is_merge && sink.is_merge), _fragment_instance_ids(fragment_instance_ids) { DCHECK_GT(destinations.size(), 0); DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED || @@ -571,19 +571,13 @@ std::shared_ptr ExchangeSinkOperatorX::_create_buffer( // Therefore, a shared sink buffer is used here to limit the number of concurrent RPCs. // (Note: This does not reduce the total number of RPCs.) // In a merge sort scenario, there are only n RPCs, so a shared sink buffer is not needed. -/// TODO: Modify this to let FE handle the judgment instead of BE. std::shared_ptr ExchangeSinkOperatorX::get_sink_buffer( InstanceLoId sender_ins_id) { - if (!_child) { - throw doris::Exception(ErrorCode::INTERNAL_ERROR, - "ExchangeSinkOperatorX did not correctly set the child."); - } // When the child is SortSourceOperatorX or LocalExchangeSourceOperatorX, // it is an order-by scenario. // In this case, there is only one target instance, and no n * n RPC concurrency will occur. // Therefore, sharing a sink buffer is not necessary. - if (std::dynamic_pointer_cast(_child) || - std::dynamic_pointer_cast(_child)) { + if (_dest_is_merge) { return _create_buffer({sender_ins_id}); } if (_state->enable_shared_exchange_sink_buffer()) { diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 85575beb9f7e47..3d6eeb4b39e94f 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -205,7 +205,6 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX get_sink_buffer(InstanceLoId sender_ins_id); vectorized::VExprContextSPtrs& tablet_sink_expr_ctxs() { return _tablet_sink_expr_ctxs; } @@ -260,6 +259,9 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX& _fragment_instance_ids; };