Skip to content

Commit

Permalink
branch-2.1: [fix](mtmv) Fix generate hyper graph wrongly when has fil…
Browse files Browse the repository at this point in the history
…ter which can not push down #43539 (#43946)

Cherry-picked from #43539

Co-authored-by: seawinde <[email protected]>
  • Loading branch information
github-actions[bot] and seawinde authored Nov 16, 2024
1 parent b3022df commit 91dcc0e
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ private Pair<BitSet, Long> buildForMv(Plan plan) {
LogicalFilter<?> filter = (LogicalFilter<?>) plan;
Pair<BitSet, Long> child = this.buildForMv(filter.child());
this.addFilter(filter, child);
return Pair.of(new BitSet(), child.second);
return Pair.of(child.first, child.second);
}

// process Other Node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,12 +724,14 @@ protected SplitPredicate predicatesCompensate(
SlotMapping queryToViewMapping = viewToQuerySlotMapping.inverse();
// try to use
boolean valid = containsNullRejectSlot(requireNoNullableViewSlot,
queryStructInfo.getPredicates().getPulledUpPredicates(), queryToViewMapping, cascadesContext);
queryStructInfo.getPredicates().getPulledUpPredicates(), queryToViewMapping, queryStructInfo,
viewStructInfo, cascadesContext);
if (!valid) {
queryStructInfo = queryStructInfo.withPredicates(
queryStructInfo.getPredicates().merge(comparisonResult.getQueryAllPulledUpExpressions()));
valid = containsNullRejectSlot(requireNoNullableViewSlot,
queryStructInfo.getPredicates().getPulledUpPredicates(), queryToViewMapping, cascadesContext);
queryStructInfo.getPredicates().getPulledUpPredicates(), queryToViewMapping,
queryStructInfo, viewStructInfo, cascadesContext);
}
if (!valid) {
return SplitPredicate.INVALID_INSTANCE;
Expand Down Expand Up @@ -779,6 +781,8 @@ protected SplitPredicate predicatesCompensate(
private boolean containsNullRejectSlot(Set<Set<Slot>> requireNoNullableViewSlot,
Set<Expression> queryPredicates,
SlotMapping queryToViewMapping,
StructInfo queryStructInfo,
StructInfo viewStructInfo,
CascadesContext cascadesContext) {
Set<Expression> queryPulledUpPredicates = queryPredicates.stream()
.flatMap(expr -> ExpressionUtils.extractConjunction(expr).stream())
Expand All @@ -791,16 +795,37 @@ private boolean containsNullRejectSlot(Set<Set<Slot>> requireNoNullableViewSlot,
return expr;
})
.collect(Collectors.toSet());
Set<Expression> nullRejectPredicates = ExpressionUtils.inferNotNull(queryPulledUpPredicates, cascadesContext);
Set<Expression> queryUsedNeedRejectNullSlotsViewBased = nullRejectPredicates.stream()
.map(expression -> TypeUtils.isNotNull(expression).orElse(null))
.filter(Objects::nonNull)
.map(expr -> ExpressionUtils.replace((Expression) expr, queryToViewMapping.toSlotReferenceMap()))
Set<Expression> queryNullRejectPredicates =
ExpressionUtils.inferNotNull(queryPulledUpPredicates, cascadesContext);
if (queryPulledUpPredicates.containsAll(queryNullRejectPredicates)) {
// Query has no null reject predicates, return
return false;
}
// Get query null reject predicate slots
Set<Expression> queryNullRejectSlotSet = new HashSet<>();
for (Expression queryNullRejectPredicate : queryNullRejectPredicates) {
Optional<Slot> notNullSlot = TypeUtils.isNotNull(queryNullRejectPredicate);
if (!notNullSlot.isPresent()) {
continue;
}
queryNullRejectSlotSet.add(notNullSlot.get());
}
// query slot need shuttle to use table slot, avoid alias influence
Set<Expression> queryUsedNeedRejectNullSlotsViewBased = ExpressionUtils.shuttleExpressionWithLineage(
new ArrayList<>(queryNullRejectSlotSet), queryStructInfo.getTopPlan(), new BitSet()).stream()
.map(expr -> ExpressionUtils.replace(expr, queryToViewMapping.toSlotReferenceMap()))
.collect(Collectors.toSet());
// view slot need shuttle to use table slot, avoid alias influence
Set<Set<Slot>> shuttledRequireNoNullableViewSlot = new HashSet<>();
for (Set<Slot> requireNullableSlots : requireNoNullableViewSlot) {
shuttledRequireNoNullableViewSlot.add(
ExpressionUtils.shuttleExpressionWithLineage(new ArrayList<>(requireNullableSlots),
viewStructInfo.getTopPlan(), new BitSet()).stream().map(Slot.class::cast)
.collect(Collectors.toSet()));
}
// query pulledUp predicates should have null reject predicates and contains any require noNullable slot
return !queryPulledUpPredicates.containsAll(nullRejectPredicates)
&& requireNoNullableViewSlot.stream().noneMatch(set ->
Sets.intersection(set, queryUsedNeedRejectNullSlotsViewBased).isEmpty());
return shuttledRequireNoNullableViewSlot.stream().noneMatch(viewRequiredNullSlotSet ->
Sets.intersection(viewRequiredNullSlotSet, queryUsedNeedRejectNullSlotsViewBased).isEmpty());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,51 @@
2023-12-11 4 2
2023-12-12 5 4

-- !query10_0_before --
2023-12-09 1 yy 2 4
2023-12-11 2 mm 4 3

-- !query10_0_after --
2023-12-09 1 yy 2 4
2023-12-11 2 mm 4 3

-- !query11_0_before --
2023-12-09 1 yy 95 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 97 4
2023-12-10 1 yy 100 2
2023-12-10 1 yy 101 2
2023-12-10 1 yy 98 2
2023-12-10 1 yy 99 2
2023-12-11 2 mm 102 3
2023-12-11 2 mm 103 3
2023-12-11 2 mm 104 3
2023-12-12 2 mi 105 2
2023-12-12 2 mi 105 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 108 2
2023-12-12 2 mi 108 2

-- !query11_0_after --
2023-12-09 1 yy 95 4
2023-12-09 1 yy 96 4
2023-12-09 1 yy 97 4
2023-12-10 1 yy 100 2
2023-12-10 1 yy 101 2
2023-12-10 1 yy 98 2
2023-12-10 1 yy 99 2
2023-12-11 2 mm 102 3
2023-12-11 2 mm 103 3
2023-12-11 2 mm 104 3
2023-12-12 2 mi 105 2
2023-12-12 2 mi 105 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 106 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 107 2
2023-12-12 2 mi 108 2
2023-12-12 2 mi 108 2

Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,91 @@ suite("outer_join") {
(2, 3, 10, 11.01, 'supply2');
"""

sql """drop table if exists orders_same_col;"""
sql """
CREATE TABLE IF NOT EXISTS orders_same_col (
o_orderkey INTEGER NOT NULL,
o_custkey INTEGER NOT NULL,
o_orderstatus CHAR(1) NOT NULL,
o_totalprice DECIMALV3(15,2) NOT NULL,
o_orderdate DATE NOT NULL,
o_orderpriority CHAR(15) NOT NULL,
o_clerk CHAR(15) NOT NULL,
o_shippriority INTEGER NOT NULL,
O_COMMENT VARCHAR(79) NOT NULL,
o_code VARCHAR(6) NOT NULL
)
DUPLICATE KEY(o_orderkey, o_custkey)
DISTRIBUTED BY HASH(o_orderkey) BUCKETS 3
PROPERTIES (
"replication_num" = "1"
);"""

sql """
insert into orders_same_col values
(1, 1, 'o', 9.5, '2023-12-08', 'a', 'b', 1, 'yy', '91'),
(1, 1, 'o', 10.5, '2023-12-08', 'a', 'b', 1, 'yy', '92'),
(1, 1, 'o', 10.5, '2023-12-08', 'a', 'b', 1, 'yy', '93'),
(1, 1, 'o', 10.5, '2023-12-08', 'a', 'b', 1, 'yy', '94'),
(2, 1, 'o', 11.5, '2023-12-09', 'a', 'b', 1, 'yy', '95'),
(2, 1, 'o', 11.5, '2023-12-09', 'a', 'b', 1, 'yy', '96'),
(2, 1, 'o', 11.5, '2023-12-09', 'a', 'b', 1, 'yy', '97'),
(3, 1, 'o', 12.5, '2023-12-10', 'a', 'b', 1, 'yy', '98'),
(3, 1, 'o', 12.5, '2023-12-10', 'a', 'b', 1, 'yy', '99'),
(3, 1, 'o', 12.5, '2023-12-10', 'a', 'b', 1, 'yy', '100'),
(3, 1, 'o', 33.5, '2023-12-10', 'a', 'b', 1, 'yy', '101'),
(4, 2, 'o', 43.2, '2023-12-11', 'c','d',2, 'mm', '102'),
(4, 2, 'o', 43.2, '2023-12-11', 'c','d',2, 'mm', '103'),
(4, 2, 'o', 43.2, '2023-12-11', 'c','d',2, 'mm', '104'),
(5, 2, 'o', 56.2, '2023-12-12', 'c','d',2, 'mi', '105'),
(5, 2, 'o', 56.2, '2023-12-12', 'c','d',2, 'mi', '106'),
(5, 2, 'o', 56.2, '2023-12-12', 'c','d',2, 'mi', '107'),
(5, 2, 'o', 1.2, '2023-12-12', 'c','d',2, 'mi', '108');
"""

sql """drop table if exists lineitem_same_col; """
sql """
CREATE TABLE IF NOT EXISTS lineitem_same_col (
l_orderkey INTEGER NOT NULL,
l_partkey INTEGER NOT NULL,
l_suppkey INTEGER NOT NULL,
l_linenumber INTEGER NOT NULL,
l_quantity DECIMALV3(15,2) NOT NULL,
l_extendedprice DECIMALV3(15,2) NOT NULL,
l_discount DECIMALV3(15,2) NOT NULL,
l_tax DECIMALV3(15,2) NOT NULL,
l_returnflag CHAR(1) NOT NULL,
l_linestatus CHAR(1) NOT NULL,
l_shipdate DATE NOT NULL,
l_commitdate DATE NOT NULL,
l_receiptdate DATE NOT NULL,
l_shipinstruct CHAR(25) NOT NULL,
l_shipmode CHAR(10) NOT NULL,
l_comment VARCHAR(44) NOT NULL,
o_code VARCHAR(6) NOT NULL
)
DUPLICATE KEY(l_orderkey, l_partkey, l_suppkey, l_linenumber)
DISTRIBUTED BY HASH(l_orderkey) BUCKETS 3
PROPERTIES (
"replication_num" = "1"
);
"""

sql """
insert into lineitem_same_col values
(1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-08', '2023-12-09', '2023-12-10', 'a', 'b', 'yyyyyyyyy', '91'),
(2, 4, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-09', '2023-12-09', '2023-12-10', 'a', 'b', 'yyyyyyyyy', '92'),
(3, 2, 4, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-10', '2023-12-09', '2023-12-10', 'a', 'b', 'yyyyyyyyy', '93'),
(4, 3, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-12-11', '2023-12-09', '2023-12-10', 'a', 'b', 'yyyyyyyyy', '94'),
(5, 2, 3, 6, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-12-12', '2023-12-12', '2023-12-13', 'c', 'd', 'xxxxxxxxx','95');
"""

sql """analyze table lineitem with sync;"""
sql """analyze table orders with sync;"""
sql """analyze table partsupp with sync;"""
sql """analyze table orders_same_col with sync;"""
sql """analyze table lineitem_same_col with sync;"""

// without filter
def mv1_0 = "select lineitem.L_LINENUMBER, orders.O_CUSTKEY " +
Expand Down Expand Up @@ -606,4 +688,75 @@ suite("outer_join") {
async_mv_rewrite_success(db, mv9_0, query9_0, "mv9_0")
order_qt_query9_0_after "${query9_0}"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv9_0"""


// Test filter which can not push down through join and there is more than two join
def mv10_0 = """
select
o_orderdate,
o_shippriority,
o_comment,
l_orderkey,
l_partkey
from
orders left
join lineitem on l_orderkey = o_orderkey
left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey;
"""

def query10_0 = """
select
o_orderdate,
o_shippriority,
o_comment,
l_orderkey,
l_partkey
from
orders left
join lineitem on l_orderkey = o_orderkey
left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey
where l_partkey is null or l_partkey <> 2;
"""

order_qt_query10_0_before "${query10_0}"
async_mv_rewrite_success(db, mv10_0, query10_0, "mv10_0")
order_qt_query10_0_after "${query10_0}"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv10_0"""


// Test where filter contains the same col in both lineitem_same_col and orders_same_col
def mv11_0 = """
select
o_orderdate,
o_shippriority,
o_comment,
o.o_code as o_o_code,
l_orderkey,
l_partkey,
l.o_code as l_o_code
from
orders_same_col o left
join lineitem_same_col l on l_orderkey = o_orderkey
left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey;
"""

def query11_0 = """
select
o_orderdate,
o_shippriority,
o_comment,
o.o_code
l_orderkey,
l_partkey
from
orders_same_col o left
join lineitem_same_col l on l_orderkey = o_orderkey
left join partsupp on ps_partkey = l_partkey and l_suppkey = ps_suppkey
where l.o_code <> '91';
"""

order_qt_query11_0_before "${query11_0}"
async_mv_rewrite_success(db, mv11_0, query11_0, "mv11_0")
order_qt_query11_0_after "${query11_0}"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv11_0"""
}

0 comments on commit 91dcc0e

Please sign in to comment.