Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Dec 12, 2024
1 parent 9e3d5fd commit 168ef7c
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "pipeline/dependency.h"
#include "pipeline/exec/operator.h"
#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/exec/spill_sort_source_operator.h"
#include "pipeline/local_exchange/local_exchange_sink_operator.h"
#include "pipeline/local_exchange/local_exchange_source_operator.h"
#include "pipeline/pipeline_fragment_context.h"
Expand Down Expand Up @@ -772,11 +773,16 @@ std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer(
// In this case, there is only one target instance, and no n * n RPC concurrency will occur.
// Therefore, sharing a sink buffer is not necessary.
if (std::dynamic_pointer_cast<SortSourceOperatorX>(_child) ||
std::dynamic_pointer_cast<LocalExchangeSourceOperatorX>(_child)) {
std::dynamic_pointer_cast<LocalExchangeSourceOperatorX>(_child) ||
std::dynamic_pointer_cast<SpillSortSourceOperatorX>(_child)) {
DCHECK_EQ(_dest_is_merge, true);
return _create_buffer({sender_ins_id});
}
DCHECK_EQ(_dest_is_merge, false);
if (_dest_is_merge) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "_dest_is_merge {}. child name {}",
_dest_is_merge, _child->get_name());
}

if (_state->enable_shared_exchange_sink_buffer()) {
return _sink_buffer;
}
Expand Down

0 comments on commit 168ef7c

Please sign in to comment.