Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Dec 12, 2024
1 parent afb2e66 commit 8864f7a
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
22 changes: 20 additions & 2 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "pipeline/dependency.h"
#include "pipeline/exec/operator.h"
#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/exec/spill_sort_source_operator.h"
#include "pipeline/local_exchange/local_exchange_sink_operator.h"
#include "pipeline/local_exchange/local_exchange_source_operator.h"
#include "pipeline/pipeline_fragment_context.h"
Expand Down Expand Up @@ -283,7 +284,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_tablet_sink_txn_id(sink.tablet_sink_txn_id),
_t_tablet_sink_exprs(&sink.tablet_sink_exprs),
_enable_local_merge_sort(state->enable_local_merge_sort()),
_fragment_instance_ids(fragment_instance_ids) {
_fragment_instance_ids(fragment_instance_ids),
_dest_is_merge(sink.__isset.is_merge && sink.is_merge) {
DCHECK_GT(destinations.size(), 0);
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
Expand Down Expand Up @@ -587,9 +589,25 @@ std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer(
// In this case, there is only one target instance, and no n * n RPC concurrency will occur.
// Therefore, sharing a sink buffer is not necessary.
if (std::dynamic_pointer_cast<SortSourceOperatorX>(_child) ||
std::dynamic_pointer_cast<LocalExchangeSourceOperatorX>(_child)) {
std::dynamic_pointer_cast<SpillSortSourceOperatorX>(_child)) {
if (!_dest_is_merge) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "_dest_is_merge {}. child name {}",
_dest_is_merge, _child->get_name());
}
return _create_buffer({sender_ins_id});
} else if (auto op = std::dynamic_pointer_cast<LocalExchangeSourceOperatorX>(_child);
op->is_merge_sort()) {
if (!_dest_is_merge) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "_dest_is_merge {}. child name {}",
_dest_is_merge, _child->get_name());
}
return _create_buffer({sender_ins_id});
}
if (_dest_is_merge) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "_dest_is_merge {}. child name {}",
_dest_is_merge, _child->get_name());
}

if (_state->enable_shared_exchange_sink_buffer()) {
return _sink_buffer;
}
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
int _writer_count = 1;
const bool _enable_local_merge_sort;
const std::vector<TUniqueId>& _fragment_instance_ids;
const bool _dest_is_merge;
};

} // namespace pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class LocalExchangeSourceOperatorX final : public OperatorX<LocalExchangeSourceL

bool is_source() const override { return true; }

bool is_merge_sort() const { return _exchange_type == ExchangeType::LOCAL_MERGE_SORT; }

private:
friend class LocalExchangeSourceLocalState;

Expand Down

0 comments on commit 8864f7a

Please sign in to comment.