Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Dec 10, 2024
1 parent c72f349 commit 9e3d5fd
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 1 deletion.
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_tablet_sink_txn_id(sink.tablet_sink_txn_id),
_t_tablet_sink_exprs(&sink.tablet_sink_exprs),
_enable_local_merge_sort(state->enable_local_merge_sort()),
_fragment_instance_ids(fragment_instance_ids) {
_fragment_instance_ids(fragment_instance_ids),
_dest_is_merge(sink.__isset.is_merge && sink.is_merge) {
DCHECK_GT(destinations.size(), 0);
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
Expand Down Expand Up @@ -772,8 +773,10 @@ std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer(
// Therefore, sharing a sink buffer is not necessary.
if (std::dynamic_pointer_cast<SortSourceOperatorX>(_child) ||
std::dynamic_pointer_cast<LocalExchangeSourceOperatorX>(_child)) {
DCHECK_EQ(_dest_is_merge, true);
return _create_buffer({sender_ins_id});
}
DCHECK_EQ(_dest_is_merge, false);
if (_state->enable_shared_exchange_sink_buffer()) {
return _sink_buffer;
}
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
int _writer_count = 1;
const bool _enable_local_merge_sort;
const std::vector<TUniqueId>& _fragment_instance_ids;
const bool _dest_is_merge;
};

} // namespace pipeline
Expand Down

0 comments on commit 9e3d5fd

Please sign in to comment.