From 9e3d5fd7d4a2004c07dfb29a3761d7e89f891863 Mon Sep 17 00:00:00 2001 From: Mryange Date: Mon, 9 Dec 2024 17:06:16 +0800 Subject: [PATCH] test --- be/src/pipeline/exec/exchange_sink_operator.cpp | 5 ++++- be/src/pipeline/exec/exchange_sink_operator.h | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 04b9653e9c82996..65d1a5d2fb83ae2 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -327,7 +327,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( _tablet_sink_txn_id(sink.tablet_sink_txn_id), _t_tablet_sink_exprs(&sink.tablet_sink_exprs), _enable_local_merge_sort(state->enable_local_merge_sort()), - _fragment_instance_ids(fragment_instance_ids) { + _fragment_instance_ids(fragment_instance_ids), + _dest_is_merge(sink.__isset.is_merge && sink.is_merge) { DCHECK_GT(destinations.size(), 0); DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED || sink.output_partition.type == TPartitionType::HASH_PARTITIONED || @@ -772,8 +773,10 @@ std::shared_ptr ExchangeSinkOperatorX::get_sink_buffer( // Therefore, sharing a sink buffer is not necessary. if (std::dynamic_pointer_cast(_child) || std::dynamic_pointer_cast(_child)) { + DCHECK_EQ(_dest_is_merge, true); return _create_buffer({sender_ins_id}); } + DCHECK_EQ(_dest_is_merge, false); if (_state->enable_shared_exchange_sink_buffer()) { return _sink_buffer; } diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 8d094b43f613f0d..79f16065039b7b4 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -294,6 +294,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX& _fragment_instance_ids; + const bool _dest_is_merge; }; } // namespace pipeline