Skip to content

Commit

Permalink
[Enhancement] support pushdown subfield on topn
Browse files Browse the repository at this point in the history
Signed-off-by: Seaven <[email protected]>
  • Loading branch information
Seaven committed Jul 18, 2024
1 parent 8e5b2bd commit 2f083be
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.starrocks.sql.optimizer.operator.logical.LogicalJoinOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalProjectOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalTopNOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalUnionOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalWindowOperator;
import com.starrocks.sql.optimizer.operator.scalar.CallOperator;
Expand Down Expand Up @@ -163,6 +164,40 @@ private OptExpression generatePushDownProject(OptExpression optExpression, Colum
optExpression);
}

@Override
public OptExpression visitLogicalTopN(OptExpression optExpression, Context context) {
if (context.pushDownExprRefs.isEmpty()) {
return visit(optExpression, context);
}

LogicalTopNOperator topN = optExpression.getOp().cast();

ColumnRefSet topNColumns = new ColumnRefSet();
topN.getOrderByElements().stream().map(Ordering::getColumnRef).forEach(topNColumns::union);
if (topN.getPartitionByColumns() != null) {
topN.getPartitionByColumns().forEach(topNColumns::union);
}

Context localContext = new Context();
Context childContext = new Context();
ColumnRefSet childSubfieldOutputs = new ColumnRefSet();
for (Map.Entry<ScalarOperator, ColumnRefSet> entry : context.pushDownExprUseColumns.entrySet()) {
ScalarOperator expr = entry.getKey();
ColumnRefSet useColumns = entry.getValue();

if (topNColumns.isIntersect(useColumns)) {
localContext.put(context.pushDownExprRefsIndex.get(expr), expr);
} else {
childContext.put(context.pushDownExprRefsIndex.get(expr), expr);
childSubfieldOutputs.union(context.pushDownExprRefsIndex.get(expr));
}
}
if (!localContext.pushDownExprRefs.isEmpty()) {
optExpression = generatePushDownProject(optExpression, childSubfieldOutputs, localContext);
}
return visitChildren(optExpression, childContext);
}

@Override
public OptExpression visitLogicalProject(OptExpression optExpression, Context context) {
if (context.pushDownExprRefs.isEmpty() && optExpression.inputAt(0).getInputs().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,15 @@ public void testArrayProjectOrderLimit() throws Exception {
"from s2 where a1[1] = 'Jiangsu' and a2[2] = 'GD' order by v1 limit 2;";
String plan = getVerboseExplain(sql);
Assert.assertTrue(plan, plan.contains(" Global Dict Exprs:\n" +
" 19: DictDefine(18: a2, [<place-holder>])\n" +
" 20: DictDefine(17: a1, [<place-holder>])\n" +
" 21: DictDefine(17: a1, [<place-holder>])\n" +
" 22: DictDefine(18: a2, [<place-holder>])\n" +
" 23: DictDefine(17: a1, [<place-holder>])\n" +
" 24: DictDefine(18: a2, [<place-holder>])\n" +
" 23: DictDefine(22: a2, [<place-holder>])\n" +
" 24: DictDefine(21: a1, [<place-holder>])\n" +
" 25: DictDefine(21: a1, [<place-holder>])\n" +
" 26: DictDefine(22: a2, [<place-holder>])\n" +
" 27: DictDefine(21: a1, [<place-holder>])\n" +
" 28: DictDefine(22: a2, [<place-holder>])\n" +
"\n" +
" 5:Decode\n" +
" | <dict id 19> : <string id 6>"));
" | <dict id 23> : <string id 6>"));
}

@Test
Expand Down Expand Up @@ -559,8 +559,8 @@ public void testArrayIfNullArray() throws Exception {
public void testArrayIfNullString() throws Exception {
String sql = "select ifnull(a1[1], a2[1]), a1, a2 from s2 order by v1";
String plan = getFragmentPlan(sql);
assertContains(plan, "ifnull(DictDecode(8: a1, [<place-holder>], 8: a1[1]), " +
"DictDecode(9: a2, [<place-holder>], 9: a2[1]))");
assertContains(plan, "ifnull(DictDecode(10: a1, [<place-holder>], 10: a1[1]), " +
"DictDecode(11: a2, [<place-holder>], 11: a2[1]))");
}

@Test
Expand Down Expand Up @@ -657,30 +657,30 @@ public void testAggreagateOrUnique() throws Exception {
"from s4 where a1[1] = 'Jiangsu' and a2[2] = 'GD' order by v1 limit 2;";
String plan = getVerboseExplain(sql);
Assert.assertTrue(plan, plan.contains(" Global Dict Exprs:\n" +
" 19: DictDefine(18: a2, [<place-holder>])\n" +
" 20: DictDefine(17: a1, [<place-holder>])\n" +
" 21: DictDefine(17: a1, [<place-holder>])\n" +
" 22: DictDefine(18: a2, [<place-holder>])\n" +
" 23: DictDefine(17: a1, [<place-holder>])\n" +
" 24: DictDefine(18: a2, [<place-holder>])\n" +
" 23: DictDefine(22: a2, [<place-holder>])\n" +
" 24: DictDefine(21: a1, [<place-holder>])\n" +
" 25: DictDefine(21: a1, [<place-holder>])\n" +
" 26: DictDefine(22: a2, [<place-holder>])\n" +
" 27: DictDefine(21: a1, [<place-holder>])\n" +
" 28: DictDefine(22: a2, [<place-holder>])\n" +
"\n" +
" 5:Decode\n" +
" | <dict id 19> : <string id 6>"));
" | <dict id 23> : <string id 6>"));

sql = "select array_length(a1), array_max(a2), array_min(a1), array_distinct(a1), array_sort(a2),\n" +
" reverse(a1), array_slice(a2, 2, 4), cardinality(a2)\n" +
"from s5 where a1[1] = 'Jiangsu' and a2[2] = 'GD' order by v1 limit 2;";
plan = getVerboseExplain(sql);
Assert.assertTrue(plan, plan.contains(" Global Dict Exprs:\n" +
" 19: DictDefine(18: a2, [<place-holder>])\n" +
" 20: DictDefine(17: a1, [<place-holder>])\n" +
" 21: DictDefine(17: a1, [<place-holder>])\n" +
" 22: DictDefine(18: a2, [<place-holder>])\n" +
" 23: DictDefine(17: a1, [<place-holder>])\n" +
" 24: DictDefine(18: a2, [<place-holder>])\n" +
" 23: DictDefine(22: a2, [<place-holder>])\n" +
" 24: DictDefine(21: a1, [<place-holder>])\n" +
" 25: DictDefine(21: a1, [<place-holder>])\n" +
" 26: DictDefine(22: a2, [<place-holder>])\n" +
" 27: DictDefine(21: a1, [<place-holder>])\n" +
" 28: DictDefine(22: a2, [<place-holder>])\n" +
"\n" +
" 5:Decode\n" +
" | <dict id 19> : <string id 6>"));
" | <dict id 23> : <string id 6>"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1114,4 +1114,23 @@ public void testOtherFunctionJson() throws Exception {
String plan = getFragmentPlan(sql);
assertContains(plan, "lower(CAST(coalesce(json_query(2: j1, 'a'), json_query(2: j1, 'b')) AS VARCHAR)) = 'x'");
}

@Test
public void testTopN() throws Exception {
String sql = "select array_length(a1) " +
"from (select * from pc0 order by a1 limit 10) x";
String plan = getVerboseExplain(sql);
assertNotContains(plan, "ColumnAccessPath");
assertContains(plan, " 1:TOP-N\n" +
" | order by: [7, ARRAY<INT>, true] ASC");
assertContains(plan, "3:Project\n" +
" | output columns:\n" +
" | 8 <-> array_length");

sql = "select array_length(a1) " +
"from (select * from pc0 order by v1 limit 10) x";
plan = getVerboseExplain(sql);
assertContains(plan, "ColumnAccessPath: [/a1/OFFSET]");
assertContains(plan, "1:Project");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1372,14 +1372,14 @@ public void testPruneSubfieldAfterWindow() throws Exception {
String sql = "select array_length(v3) from (select v3, row_number() over (order by v2) row_num from tarray) t " +
"where row_num = 1";
String plan = getFragmentPlan(sql);
assertContains(plan, "4:ANALYTIC\n" +
assertContains(plan, " 4:ANALYTIC\n" +
" | functions: [, row_number(), ]\n" +
" | order by: 2: v2 ASC\n" +
" | window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\n" +
" | \n" +
" 3:Project\n" +
" 3:MERGING-EXCHANGE");
assertContains(plan, " 1:Project\n" +
" | <slot 2> : 2: v2\n" +
" | <slot 6> : array_length(3: v3)\n" +
" | limit: 1");
" | <slot 6> : array_length(3: v3)");
}
}

0 comments on commit 2f083be

Please sign in to comment.