Skip to content

Commit

Permalink
[refine](exchange) Use is_merge from FE for judgment instead of relyi…
Browse files Browse the repository at this point in the history
…ng on the operator in BE. (apache#45592)

### What problem does this PR solve?

Previously, determining whether the receiver is a merge exchange relied
on checking if the specific operator was a sort node.
However, this approach is incorrect because there are many types of sort
operators: regular sort, partitioned sort, and spill sort.
  • Loading branch information
Mryange authored Dec 19, 2024
1 parent 2a7c2e3 commit a3279a2
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 9 deletions.
10 changes: 2 additions & 8 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include "pipeline/exec/operator.h"
#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/local_exchange/local_exchange_sink_operator.h"
#include "pipeline/local_exchange/local_exchange_source_operator.h"
#include "pipeline/pipeline_fragment_context.h"
#include "util/runtime_profile.h"
#include "util/uid_util.h"
Expand Down Expand Up @@ -279,6 +278,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_tablet_sink_txn_id(sink.tablet_sink_txn_id),
_t_tablet_sink_exprs(&sink.tablet_sink_exprs),
_enable_local_merge_sort(state->enable_local_merge_sort()),
_dest_is_merge(sink.__isset.is_merge && sink.is_merge),
_fragment_instance_ids(fragment_instance_ids) {
DCHECK_GT(destinations.size(), 0);
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
Expand Down Expand Up @@ -571,19 +571,13 @@ std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::_create_buffer(
// Therefore, a shared sink buffer is used here to limit the number of concurrent RPCs.
// (Note: This does not reduce the total number of RPCs.)
// In a merge sort scenario, there are only n RPCs, so a shared sink buffer is not needed.
/// TODO: Modify this to let FE handle the judgment instead of BE.
std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer(
InstanceLoId sender_ins_id) {
if (!_child) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"ExchangeSinkOperatorX did not correctly set the child.");
}
// When the child is SortSourceOperatorX or LocalExchangeSourceOperatorX,
// it is an order-by scenario.
// In this case, there is only one target instance, and no n * n RPC concurrency will occur.
// Therefore, sharing a sink buffer is not necessary.
if (std::dynamic_pointer_cast<SortSourceOperatorX>(_child) ||
std::dynamic_pointer_cast<LocalExchangeSourceOperatorX>(_child)) {
if (_dest_is_merge) {
return _create_buffer({sender_ins_id});
}
if (_state->enable_shared_exchange_sink_buffer()) {
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
// Therefore, a shared sink buffer is used here to limit the number of concurrent RPCs.
// (Note: This does not reduce the total number of RPCs.)
// In a merge sort scenario, there are only n RPCs, so a shared sink buffer is not needed.
/// TODO: Modify this to let FE handle the judgment instead of BE.
std::shared_ptr<ExchangeSinkBuffer> get_sink_buffer(InstanceLoId sender_ins_id);
vectorized::VExprContextSPtrs& tablet_sink_expr_ctxs() { return _tablet_sink_expr_ctxs; }

Expand Down Expand Up @@ -260,6 +259,9 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
size_t _data_processed = 0;
int _writer_count = 1;
const bool _enable_local_merge_sort;
// If dest_is_merge is true, it indicates that the corresponding receiver is a VMERGING-EXCHANGE.
// The receiver will sort the collected data, so the sender must ensure that the data sent is ordered.
const bool _dest_is_merge;
const std::vector<TUniqueId>& _fragment_instance_ids;
};

Expand Down

0 comments on commit a3279a2

Please sign in to comment.