diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index aa893fc0a26f2e..4a0b125f9a03b7 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -31,6 +31,7 @@ #include "pipeline/dependency.h" #include "pipeline/exec/operator.h" #include "pipeline/exec/sort_source_operator.h" +#include "pipeline/exec/spill_sort_source_operator.h" #include "pipeline/local_exchange/local_exchange_sink_operator.h" #include "pipeline/local_exchange/local_exchange_source_operator.h" #include "pipeline/pipeline_fragment_context.h" @@ -283,7 +284,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( _tablet_sink_txn_id(sink.tablet_sink_txn_id), _t_tablet_sink_exprs(&sink.tablet_sink_exprs), _enable_local_merge_sort(state->enable_local_merge_sort()), - _fragment_instance_ids(fragment_instance_ids) { + _fragment_instance_ids(fragment_instance_ids), + _dest_is_merge(sink.__isset.is_merge && sink.is_merge) { DCHECK_GT(destinations.size(), 0); DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED || sink.output_partition.type == TPartitionType::HASH_PARTITIONED || @@ -587,9 +589,27 @@ std::shared_ptr ExchangeSinkOperatorX::get_sink_buffer( // In this case, there is only one target instance, and no n * n RPC concurrency will occur. // Therefore, sharing a sink buffer is not necessary. if (std::dynamic_pointer_cast(_child) || - std::dynamic_pointer_cast(_child)) { + std::dynamic_pointer_cast(_child)) { + if (!_dest_is_merge) { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "_dest_is_merge {}. child name {}", + _dest_is_merge, _child->get_name()); + } return _create_buffer({sender_ins_id}); + } else if (auto op = std::dynamic_pointer_cast(_child)) { + if (op->is_merge_sort()) { + if (!_dest_is_merge) { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, + "_dest_is_merge {}. child name {}", _dest_is_merge, + _child->get_name()); + } + return _create_buffer({sender_ins_id}); + } } + if (_dest_is_merge) { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "_dest_is_merge {}. child name {}", + _dest_is_merge, _child->get_name()); + } + if (_state->enable_shared_exchange_sink_buffer()) { return _sink_buffer; } diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index e88389b1d7bb5a..686e3355e29bf0 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -260,6 +260,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX& _fragment_instance_ids; + const bool _dest_is_merge; }; } // namespace pipeline diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/local_exchange/local_exchange_source_operator.h index 3c706d50182538..7ca7b738be1982 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h @@ -81,6 +81,8 @@ class LocalExchangeSourceOperatorX final : public OperatorX sor } SortNode sortNode = (SortNode) inputFragment.getPlanRoot().getChild(0); ((ExchangeNode) inputFragment.getPlanRoot()).setMergeInfo(sortNode.getSortInfo()); + if (inputFragment.hasChild(0) && inputFragment.getChild(0).getSink() != null) { + inputFragment.getChild(0).getSink().setMerge(true); + } sortNode.setMergeByExchange(); sortNode.setChildrenDistributeExprLists(distributeExprLists); } @@ -2198,6 +2201,9 @@ public PlanFragment visitPhysicalTopN(PhysicalTopN topN, PlanTra ExchangeNode exchangeNode = (ExchangeNode) inputFragment.getPlanRoot(); exchangeNode.setChildrenDistributeExprLists(distributeExprLists); exchangeNode.setMergeInfo(((SortNode) exchangeNode.getChild(0)).getSortInfo()); + if (inputFragment.hasChild(0) && inputFragment.getChild(0).getSink() != null) { + inputFragment.getChild(0).getSink().setMerge(true); + } exchangeNode.setLimit(topN.getLimit()); exchangeNode.setOffset(topN.getOffset()); ((SortNode) exchangeNode.getChild(0)).setMergeByExchange(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java index 8d6daa2f8b72b4..320fcae9eb0f1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java @@ -39,6 +39,7 @@ public abstract class DataSink { // Fragment that this DataSink belongs to. Set by the PlanFragment enclosing this sink. protected PlanFragment fragment; + protected boolean isMerge = false; /** * Return an explain string for the DataSink. Each line of the explain will be @@ -77,4 +78,12 @@ public static DataSink createDataSink(TableIf table) throws AnalysisException { throw new AnalysisException("Unknown table type " + table.getType()); } } + + public boolean isMerge() { + return isMerge; + } + + public void setMerge(boolean merge) { + isMerge = merge; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java index ef42190fa25004..ceda692aa4b01f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java @@ -176,6 +176,9 @@ public String getExplainString(String prefix, TExplainLevel explainLevel) { strBuilder.append(prefix).append(" PROJECTION TUPLE: ").append(outputTupleDesc.getId()); strBuilder.append("\n"); } + if (isMerge) { + strBuilder.append("IS_MERGE: true\n"); + } return strBuilder.toString(); } @@ -234,6 +237,7 @@ protected TDataSink toThrift() { tStreamSink.addToTabletSinkExprs(expr.treeToThrift()); } } + tStreamSink.setIsMerge(isMerge); tStreamSink.setTabletSinkTxnId(tabletSinkTxnId); result.setStreamSink(tStreamSink); return result; diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index ed7ccee69cd9a1..03a23c2c532ac3 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -189,6 +189,7 @@ struct TDataStreamSink { 11: optional i64 tablet_sink_txn_id 12: optional Types.TTupleId tablet_sink_tuple_id 13: optional list tablet_sink_exprs + 14: optional bool is_merge } struct TMultiCastDataStreamSink {