Skip to content

Commit

Permalink
[BugFix] Fix star count return null rather zero bug for mv rewrite (b…
Browse files Browse the repository at this point in the history
…ackport #49288) (#49315)

Signed-off-by: shuming.li <[email protected]>
Co-authored-by: shuming.li <[email protected]>
  • Loading branch information
mergify[bot] and LiShuMing authored Aug 5, 2024
1 parent 394d906 commit a92ae00
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,13 @@ private List<ScalarOperator> rewriteGroupKeys(List<ScalarOperator> groupKeys,

/**
* Rewrite aggregation by using MV.
* @param aggregates aggregation column ref -> scalar op to be rewritten
* @param equationRewriter equivalence class rewriter
* @param mapping output mapping for rewrite: column ref -> column ref
* @param queryColumnSet column set of query
* @param newProjection new projection mapping: col ref -> scalar op which is used for projection of new rewritten aggregate
* @param hasGroupByKeys whether query has group by keys or not
* @return
*/
private Map<ColumnRefOperator, CallOperator> rewriteAggregates(Map<ColumnRefOperator, ScalarOperator> aggregates,
EquationRewriter equationRewriter,
Expand All @@ -643,11 +650,21 @@ private Map<ColumnRefOperator, CallOperator> rewriteAggregates(Map<ColumnRefOper
aggCall.toString());
return null;
}
ColumnRefOperator oldColRef = (ColumnRefOperator) aggregateMapping.get(entry.getKey());
newAggregations.put(oldColRef, newAggregate);
newProjection.put(oldColRef, genRollupProject(aggCall, oldColRef, hasGroupByKeys));
ColumnRefOperator origColRef = entry.getKey();
ColumnRefOperator newAggColRef = mvRewriteContext.getMaterializationContext().getQueryRefFactory().create(
origColRef, newAggregate.getType(), newAggregate.isNullable());
newAggregations.put(newAggColRef, newAggregate);
// No needs to set `newProjections` since it will use aggColRefToAggMap to construct new projections,
// otherwise it will cause duplicate projections(or wrong projections).
// eg:
// query: oldCol1 -> count()
// newAggregations: newCol1 -> sum(oldCol1)
// aggColRefToAggMap: oldCol1 -> coalesce(newCol1, 0)
// It will generate new projections as below:
// newProjections: oldCol1 -> coalesce(newCol1, 0)
ScalarOperator newProjectOp = genRollupProject(aggCall, newAggColRef, hasGroupByKeys);
aggregateMapping.put(origColRef, newProjectOp);
}

return newAggregations;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5470,12 +5470,10 @@ public void testColumnPruningWithPredicates() {
" group by lo_orderdate" +
" having sum(lo_tax) > 100";
MVRewriteChecker checker = testRewriteOK(mv, query);
checker.contains("4:Project\n" +
checker.contains(" 4:Project\n" +
" | <slot 6> : 21: lo_orderdate\n" +
" | <slot 18> : 26: sum\n" +
" | <slot 19> : 27: sum\n" +
" | <slot 26> : clone(26: sum)\n" +
" | <slot 27> : clone(27: sum)\n" +
" | \n" +
" 3:AGGREGATE (merge finalize)");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ public void testAggregateMvRewrite() throws Exception {
" | <slot 18> : 18: t1d\n" +
" | <slot 19> : 19: total_sum\n" +
" | <slot 20> : 20: total_num\n" +
" | <slot 23> : 17: v1 + 1");
" | <slot 25> : 17: v1 + 1");

MaterializedView mv1 = getMv("test", "agg_join_mv_1");
dropMv("test", "agg_join_mv_1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.starrocks.common.FeConstants;
import com.starrocks.common.Pair;
import com.starrocks.qe.SessionVariable;
import com.starrocks.sql.common.QueryDebugOptions;
import com.starrocks.sql.optimizer.dump.QueryDumpInfo;
import com.starrocks.thrift.TExplainLevel;
import com.starrocks.utframe.UtFrameUtils;
Expand All @@ -33,6 +34,8 @@

import java.util.Set;

import static com.starrocks.sql.plan.PlanTestNoneDBBase.assertContains;

public class ReplayWithMVFromDumpTest extends ReplayFromDumpTestBase {

@BeforeClass
Expand Down Expand Up @@ -229,4 +232,23 @@ public void testSyncMVRewriteWithDict() throws Exception {
// "nmock_040, nmock_041 from tbl_mock_001 order by nmock_002;";
Assert.assertFalse(replayPair.second, replayPair.second.contains("mv_tbl_mock_001"));
}

@Test
public void testMV_CountStarRewrite() throws Exception {
QueryDebugOptions debugOptions = new QueryDebugOptions();
debugOptions.setEnableQueryTraceLog(true);
connectContext.getSessionVariable().setQueryDebugOptions(debugOptions.toString());
Pair<QueryDumpInfo, String> replayPair =
getPlanFragment(getDumpInfoFromFile("query_dump/materialized-view/count_star_rewrite"),
connectContext.getSessionVariable(), TExplainLevel.NORMAL);
assertContains(replayPair.second, "tbl_mock_067");
// NOTE: OUTPUT EXPRS must refer to coalesce column ref
assertContains(replayPair.second, " OUTPUT EXPRS:59: count\n" +
" PARTITION: RANDOM\n" +
"\n" +
" RESULT SINK\n" +
"\n" +
" 3:Project\n");
assertContains(replayPair.second, " | <slot 59> : coalesce(");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ order by
[result]
TOP-N (order by [[9: l_returnflag ASC NULLS FIRST, 10: l_linestatus ASC NULLS FIRST]])
TOP-N (order by [[9: l_returnflag ASC NULLS FIRST, 10: l_linestatus ASC NULLS FIRST]])
AGGREGATE ([GLOBAL] aggregate [{113: count=sum(113: count), 114: sum=sum(114: sum), 115: count=sum(115: count), 116: count=sum(116: count), 108: sum=sum(108: sum), 109: sum=sum(109: sum), 110: sum=sum(110: sum), 111: sum=sum(111: sum), 112: count=sum(112: count)}] group by [[34: l_returnflag, 35: l_linestatus]] having [null]
EXCHANGE SHUFFLE[34, 35]
AGGREGATE ([LOCAL] aggregate [{113: count=sum(39: count_base_price), 114: sum=sum(40: sum_discount), 115: count=sum(44: count_order), 116: count=sum(41: count_discount), 108: sum=sum(36: sum_qty), 109: sum=sum(38: sum_base_price), 110: sum=sum(42: sum_disc_price), 111: sum=sum(43: sum_charge), 112: count=sum(37: count_qty)}] group by [[34: l_returnflag, 35: l_linestatus]] having [null]
SCAN (mv[lineitem_agg_mv1] columns[33: l_shipdate, 34: l_returnflag, 35: l_linestatus, 36: sum_qty, 37: count_qty, 38: sum_base_price, 39: count_base_price, 40: sum_discount, 41: count_discount, 42: sum_disc_price, 43: sum_charge, 44: count_order] predicate[33: l_shipdate <= 1998-12-01])
AGGREGATE ([GLOBAL] aggregate [{117: sum=sum(117: sum), 118: sum=sum(118: sum), 119: sum=sum(119: sum), 120: sum=sum(120: sum), 121: count=sum(121: count), 122: count=sum(122: count), 123: sum=sum(123: sum), 124: count=sum(124: count), 125: count=sum(125: count)}] group by [[38: l_returnflag, 39: l_linestatus]] having [null]
EXCHANGE SHUFFLE[38, 39]
AGGREGATE ([LOCAL] aggregate [{117: sum=sum(40: sum_qty), 118: sum=sum(42: sum_base_price), 119: sum=sum(46: sum_disc_price), 120: sum=sum(47: sum_charge), 121: count=sum(41: count_qty), 122: count=sum(43: count_base_price), 123: sum=sum(44: sum_discount), 124: count=sum(48: count_order), 125: count=sum(45: count_discount)}] group by [[38: l_returnflag, 39: l_linestatus]] having [null]
SCAN (mv[lineitem_agg_mv1] columns[37: l_shipdate, 38: l_returnflag, 39: l_linestatus, 40: sum_qty, 41: count_qty, 42: sum_base_price, 43: count_base_price, 44: sum_discount, 45: count_discount, 46: sum_disc_price, 47: sum_charge, 48: count_order] predicate[37: l_shipdate <= 1998-12-01])
[end]

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ order by
[result]
TOP-N (order by [[10: l_returnflag ASC NULLS FIRST, 11: l_linestatus ASC NULLS FIRST]])
TOP-N (order by [[10: l_returnflag ASC NULLS FIRST, 11: l_linestatus ASC NULLS FIRST]])
AGGREGATE ([GLOBAL] aggregate [{113: sum=sum(33: sum_base_price), 114: sum=sum(35: sum_disc_price), 115: sum=sum(36: sum_charge), 116: count=sum(31: total_cnt), 117: count=sum(31: total_cnt), 118: count=sum(31: total_cnt), 119: sum=sum(34: sum_discount), 120: count=sum(31: total_cnt), 112: sum=sum(32: sum_qty)}] group by [[29: l_returnflag, 30: l_linestatus]] having [null]
SCAN (mv[lineitem_agg_mv1] columns[28: l_shipdate, 29: l_returnflag, 30: l_linestatus, 31: total_cnt, 32: sum_qty, 33: sum_base_price, 34: sum_discount, 35: sum_disc_price, 36: sum_charge] predicate[28: l_shipdate <= 1998-12-01])
AGGREGATE ([GLOBAL] aggregate [{129: count=sum(46: total_cnt), 121: sum=sum(47: sum_qty), 122: sum=sum(48: sum_base_price), 123: sum=sum(50: sum_disc_price), 124: sum=sum(51: sum_charge), 125: count=sum(46: total_cnt), 126: count=sum(46: total_cnt), 127: count=sum(46: total_cnt), 128: sum=sum(49: sum_discount)}] group by [[44: l_returnflag, 45: l_linestatus]] having [null]
SCAN (mv[lineitem_agg_mv1] columns[43: l_shipdate, 44: l_returnflag, 45: l_linestatus, 46: total_cnt, 47: sum_qty, 48: sum_base_price, 49: sum_discount, 50: sum_disc_price, 51: sum_charge] predicate[43: l_shipdate <= 1998-12-01])
[end]

Loading

0 comments on commit a92ae00

Please sign in to comment.