Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[only test Pr/englefly/45196] #45198

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "pipeline/dependency.h"
#include "pipeline/exec/operator.h"
#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/exec/spill_sort_source_operator.h"
#include "pipeline/local_exchange/local_exchange_sink_operator.h"
#include "pipeline/local_exchange/local_exchange_source_operator.h"
#include "pipeline/pipeline_fragment_context.h"
Expand Down Expand Up @@ -283,7 +284,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
_tablet_sink_txn_id(sink.tablet_sink_txn_id),
_t_tablet_sink_exprs(&sink.tablet_sink_exprs),
_enable_local_merge_sort(state->enable_local_merge_sort()),
_fragment_instance_ids(fragment_instance_ids) {
_fragment_instance_ids(fragment_instance_ids),
_dest_is_merge(sink.__isset.is_merge && sink.is_merge) {
DCHECK_GT(destinations.size(), 0);
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
Expand Down Expand Up @@ -587,9 +589,27 @@ std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer(
// In this case, there is only one target instance, and no n * n RPC concurrency will occur.
// Therefore, sharing a sink buffer is not necessary.
if (std::dynamic_pointer_cast<SortSourceOperatorX>(_child) ||
std::dynamic_pointer_cast<LocalExchangeSourceOperatorX>(_child)) {
std::dynamic_pointer_cast<SpillSortSourceOperatorX>(_child)) {
if (!_dest_is_merge) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "_dest_is_merge {}. child name {}",
_dest_is_merge, _child->get_name());
}
return _create_buffer({sender_ins_id});
} else if (auto op = std::dynamic_pointer_cast<LocalExchangeSourceOperatorX>(_child)) {
if (op->is_merge_sort()) {
if (!_dest_is_merge) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"_dest_is_merge {}. child name {}", _dest_is_merge,
_child->get_name());
}
return _create_buffer({sender_ins_id});
}
}
if (_dest_is_merge) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "_dest_is_merge {}. child name {}",
_dest_is_merge, _child->get_name());
}

if (_state->enable_shared_exchange_sink_buffer()) {
return _sink_buffer;
}
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
int _writer_count = 1;
const bool _enable_local_merge_sort;
const std::vector<TUniqueId>& _fragment_instance_ids;
const bool _dest_is_merge;
};

} // namespace pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class LocalExchangeSourceOperatorX final : public OperatorX<LocalExchangeSourceL

bool is_source() const override { return true; }

bool is_merge_sort() const { return _exchange_type == ExchangeType::LOCAL_MERGE_SORT; }

private:
friend class LocalExchangeSourceLocalState;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2147,6 +2147,9 @@ public PlanFragment visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sor
}
SortNode sortNode = (SortNode) inputFragment.getPlanRoot().getChild(0);
((ExchangeNode) inputFragment.getPlanRoot()).setMergeInfo(sortNode.getSortInfo());
if (inputFragment.hasChild(0) && inputFragment.getChild(0).getSink() != null) {
inputFragment.getChild(0).getSink().setMerge(true);
}
sortNode.setMergeByExchange();
sortNode.setChildrenDistributeExprLists(distributeExprLists);
}
Expand Down Expand Up @@ -2198,6 +2201,9 @@ public PlanFragment visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanTra
ExchangeNode exchangeNode = (ExchangeNode) inputFragment.getPlanRoot();
exchangeNode.setChildrenDistributeExprLists(distributeExprLists);
exchangeNode.setMergeInfo(((SortNode) exchangeNode.getChild(0)).getSortInfo());
if (inputFragment.hasChild(0) && inputFragment.getChild(0).getSink() != null) {
inputFragment.getChild(0).getSink().setMerge(true);
}
exchangeNode.setLimit(topN.getLimit());
exchangeNode.setOffset(topN.getOffset());
((SortNode) exchangeNode.getChild(0)).setMergeByExchange();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
public abstract class DataSink {
// Fragment that this DataSink belongs to. Set by the PlanFragment enclosing this sink.
protected PlanFragment fragment;
protected boolean isMerge = false;

/**
* Return an explain string for the DataSink. Each line of the explain will be
Expand Down Expand Up @@ -77,4 +78,12 @@ public static DataSink createDataSink(TableIf table) throws AnalysisException {
throw new AnalysisException("Unknown table type " + table.getType());
}
}

public boolean isMerge() {
return isMerge;
}

public void setMerge(boolean merge) {
isMerge = merge;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ public String getExplainString(String prefix, TExplainLevel explainLevel) {
strBuilder.append(prefix).append(" PROJECTION TUPLE: ").append(outputTupleDesc.getId());
strBuilder.append("\n");
}
if (isMerge) {
strBuilder.append("IS_MERGE: true\n");
}

return strBuilder.toString();
}
Expand Down Expand Up @@ -234,6 +237,7 @@ protected TDataSink toThrift() {
tStreamSink.addToTabletSinkExprs(expr.treeToThrift());
}
}
tStreamSink.setIsMerge(isMerge);
tStreamSink.setTabletSinkTxnId(tabletSinkTxnId);
result.setStreamSink(tStreamSink);
return result;
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/DataSinks.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ struct TDataStreamSink {
11: optional i64 tablet_sink_txn_id
12: optional Types.TTupleId tablet_sink_tuple_id
13: optional list<Exprs.TExpr> tablet_sink_exprs
14: optional bool is_merge
}

struct TMultiCastDataStreamSink {
Expand Down
Loading