Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat](nereids) add is_merge tag for data sink #45196

Merged
merged 2 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2145,6 +2145,9 @@ public PlanFragment visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sor
}
SortNode sortNode = (SortNode) inputFragment.getPlanRoot().getChild(0);
((ExchangeNode) inputFragment.getPlanRoot()).setMergeInfo(sortNode.getSortInfo());
if (inputFragment.hasChild(0) && inputFragment.getChild(0).getSink() != null) {
inputFragment.getChild(0).getSink().setMerge(true);
}
sortNode.setMergeByExchange();
sortNode.setChildrenDistributeExprLists(distributeExprLists);
}
Expand Down Expand Up @@ -2196,6 +2199,9 @@ public PlanFragment visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanTra
ExchangeNode exchangeNode = (ExchangeNode) inputFragment.getPlanRoot();
exchangeNode.setChildrenDistributeExprLists(distributeExprLists);
exchangeNode.setMergeInfo(((SortNode) exchangeNode.getChild(0)).getSortInfo());
if (inputFragment.hasChild(0) && inputFragment.getChild(0).getSink() != null) {
inputFragment.getChild(0).getSink().setMerge(true);
}
exchangeNode.setLimit(topN.getLimit());
exchangeNode.setOffset(topN.getOffset());
((SortNode) exchangeNode.getChild(0)).setMergeByExchange();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
public abstract class DataSink {
// Fragment that this DataSink belongs to. Set by the PlanFragment enclosing this sink.
protected PlanFragment fragment;
protected boolean isMerge = false;

/**
* Return an explain string for the DataSink. Each line of the explain will be
Expand Down Expand Up @@ -77,4 +78,12 @@ public static DataSink createDataSink(TableIf table) throws AnalysisException {
throw new AnalysisException("Unknown table type " + table.getType());
}
}

public boolean isMerge() {
return isMerge;
}

public void setMerge(boolean merge) {
isMerge = merge;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ public String getExplainString(String prefix, TExplainLevel explainLevel) {
strBuilder.append(prefix).append(" PROJECTION TUPLE: ").append(outputTupleDesc.getId());
strBuilder.append("\n");
}
if (isMerge) {
strBuilder.append("IS_MERGE: true\n");
}

return strBuilder.toString();
}
Expand Down Expand Up @@ -234,6 +237,7 @@ protected TDataSink toThrift() {
tStreamSink.addToTabletSinkExprs(expr.treeToThrift());
}
}
tStreamSink.setIsMerge(isMerge);
tStreamSink.setTabletSinkTxnId(tabletSinkTxnId);
result.setStreamSink(tStreamSink);
return result;
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/DataSinks.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ struct TDataStreamSink {
11: optional i64 tablet_sink_txn_id
12: optional Types.TTupleId tablet_sink_tuple_id
13: optional list<Exprs.TExpr> tablet_sink_exprs
14: optional bool is_merge
englefly marked this conversation as resolved.
Show resolved Hide resolved
}

struct TMultiCastDataStreamSink {
Expand Down
Loading