Skip to content

Commit

Permalink
feat: merge ApplyJoins more aggressively
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Oct 15, 2024
1 parent 45a8067 commit 3e2dec8
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 139 deletions.
8 changes: 4 additions & 4 deletions go/vt/vtgate/planbuilder/operators/join_merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// mergeJoinInputs checks whether two operators can be merged into a single one.
// If they can be merged, a new operator with the merged routing is returned
// If they cannot be merged, nil is returned.
func (jm *joinMerger) mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs Operator, joinPredicates []sqlparser.Expr) *Route {
func (jm *joinMerger) mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs Operator) *Route {
lhsRoute, rhsRoute, routingA, routingB, a, b, sameKeyspace := prepareInputRoutes(lhs, rhs)
if lhsRoute == nil {
return nil
Expand All @@ -38,7 +38,7 @@ func (jm *joinMerger) mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs
// If a dual is on the left side, and it is a left join (all right joins are changed to left joins), then we can only merge if the right side is a single sharded routing.
case a == dual:
rhsClone := Clone(rhs).(*Route)
for _, predicate := range joinPredicates {
for _, predicate := range jm.predicates {
if ctx.SemTable.DirectDeps(predicate).IsSolvedBy(TableID(rhsClone)) {
rhsClone.AddPredicate(ctx, predicate)
}
Expand All @@ -54,7 +54,7 @@ func (jm *joinMerger) mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs

// As both are reference route. We need to merge the alternates as well.
case a == anyShard && b == anyShard && sameKeyspace:
newrouting := mergeAnyShardRoutings(ctx, routingA.(*AnyShardRouting), routingB.(*AnyShardRouting), joinPredicates, jm.joinType)
newrouting := mergeAnyShardRoutings(ctx, routingA.(*AnyShardRouting), routingB.(*AnyShardRouting), jm.predicates, jm.joinType)
return jm.merge(ctx, lhsRoute, rhsRoute, newrouting)

// an unsharded/reference route can be merged with anything going to that keyspace
Expand All @@ -75,7 +75,7 @@ func (jm *joinMerger) mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs

// sharded routing is complex, so we handle it in a separate method
case a == sharded && b == sharded:
return tryMergeShardedRouting(ctx, lhsRoute, rhsRoute, jm, joinPredicates)
return tryMergeShardedRouting(ctx, lhsRoute, rhsRoute, jm, jm.predicates)

default:
return nil
Expand Down
13 changes: 12 additions & 1 deletion go/vt/vtgate/planbuilder/operators/query_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"io"
"strconv"

"vitess.io/vitess/go/slice"

"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
Expand Down Expand Up @@ -104,7 +106,16 @@ func runRewriters(ctx *plancontext.PlanningContext, root Operator) Operator {
return tryPushUpdate(in)
case *RecurseCTE:
return tryMergeRecurse(ctx, in)

case *ApplyJoin:
predicates := slice.Map(in.JoinPredicates.columns, func(j applyJoinColumn) sqlparser.Expr {
return j.Original
})
jm := newJoinMerge(predicates, in.JoinType)
newPlan := jm.mergeJoinInputs(ctx, in.LHS, in.RHS)
if newPlan == nil {
return in, NoRewrite
}
return newPlan, Rewrote("merge routes into single operator")
default:
return in, NoRewrite
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/route_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func requiresSwitchingSides(ctx *plancontext.PlanningContext, op Operator) (requ

func mergeOrJoin(ctx *plancontext.PlanningContext, lhs, rhs Operator, joinPredicates []sqlparser.Expr, joinType sqlparser.JoinType) (Operator, *ApplyResult) {
jm := newJoinMerge(joinPredicates, joinType)
newPlan := jm.mergeJoinInputs(ctx, lhs, rhs, joinPredicates)
newPlan := jm.mergeJoinInputs(ctx, lhs, rhs)
if newPlan != nil {
return newPlan, Rewrote("merge routes into single operator")
}
Expand Down
46 changes: 12 additions & 34 deletions go/vt/vtgate/planbuilder/testdata/cte_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -2329,41 +2329,19 @@
"QueryType": "SELECT",
"Original": "WITH RECURSIVE literal_cte AS (SELECT 1 AS id, 100 AS value, 1 AS manager_id UNION ALL SELECT id + 1, value * 2, id FROM literal_cte WHERE id < 5) SELECT l.id, l.value, l.manager_id, e.name AS employee_name FROM literal_cte l LEFT JOIN user e ON l.id = e.id",
"Instructions": {
"OperatorType": "Join",
"Variant": "LeftJoin",
"JoinColumnIndexes": "L:0,L:1,L:2,R:0",
"JoinVars": {
"l_id": 0
"OperatorType": "Route",
"Variant": "EqualUnique",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"TableName": "dual_`user`",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Reference",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "with recursive literal_cte as (select 1 as id, 100 as value, 1 as manager_id from dual where 1 != 1 union all select id + 1, value * 2, id from literal_cte where 1 != 1) select l.id, l.value, l.manager_id from literal_cte as l where 1 != 1",
"Query": "with recursive literal_cte as (select 1 as id, 100 as value, 1 as manager_id from dual union all select id + 1, value * 2, id from literal_cte where id < 5) select l.id, l.value, l.manager_id from literal_cte as l",
"Table": "dual"
},
{
"OperatorType": "Route",
"Variant": "EqualUnique",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select e.`name` as employee_name from `user` as e where 1 != 1",
"Query": "select e.`name` as employee_name from `user` as e where e.id = :l_id",
"Table": "`user`",
"Values": [
":l_id"
],
"Vindex": "user_index"
}
]
"FieldQuery": "with recursive literal_cte as (select 1 as id, 100 as value, 1 as manager_id from dual where 1 != 1 union all select id + 1, value * 2, id from literal_cte where 1 != 1) select l.id, l.value, l.manager_id, e.`name` as employee_name from literal_cte as l left join `user` as e on l.id = e.id where 1 != 1",
"Query": "with recursive literal_cte as (select 1 as id, 100 as value, 1 as manager_id from dual union all select id + 1, value * 2, id from literal_cte where id < 5) select l.id, l.value, l.manager_id, e.`name` as employee_name from literal_cte as l left join `user` as e on l.id = e.id",
"Table": "`user`, dual",
"Values": [
":l_id"
],
"Vindex": "user_index"
},
"TablesUsed": [
"main.dual",
Expand Down
40 changes: 9 additions & 31 deletions go/vt/vtgate/planbuilder/testdata/info_schema57_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -1153,38 +1153,16 @@
"QueryType": "SELECT",
"Original": "SELECT c.column_name FROM information_schema.columns c JOIN ( SELECT table_name FROM information_schema.tables WHERE table_schema != 'information_schema' LIMIT 1 ) AS tables ON tables.table_name = c.table_name",
"Instructions": {
"OperatorType": "Join",
"Variant": "Join",
"JoinColumnIndexes": "R:0",
"JoinVars": {
"tables_table_name": 0
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"TableName": "information_schema.`tables`_information_schema.`columns`",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select `tables`.table_name from (select table_name from information_schema.`tables` where 1 != 1) as `tables` where 1 != 1",
"Query": "select `tables`.table_name from (select table_name from information_schema.`tables` where table_schema != 'information_schema' limit 1) as `tables`",
"Table": "information_schema.`tables`"
},
{
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select c.column_name from information_schema.`columns` as c where 1 != 1",
"Query": "select c.column_name from information_schema.`columns` as c where c.table_name = :c_table_name /* VARCHAR */",
"SysTableTableName": "[c_table_name::tables_table_name]",
"Table": "information_schema.`columns`"
}
]
"FieldQuery": "select c.column_name from (select table_name from information_schema.`tables` where 1 != 1) as `tables`, information_schema.`columns` as c where 1 != 1",
"Query": "select c.column_name from (select table_name from information_schema.`tables` where table_schema != 'information_schema' limit 1) as `tables`, information_schema.`columns` as c where `tables`.table_name = c.table_name",
"SysTableTableName": "[c_table_name::tables_table_name]",
"Table": "information_schema.`columns`, information_schema.`tables`"
}
}
}
Expand Down
40 changes: 9 additions & 31 deletions go/vt/vtgate/planbuilder/testdata/info_schema80_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -1275,38 +1275,16 @@
"QueryType": "SELECT",
"Original": "SELECT c.column_name FROM information_schema.columns c JOIN ( SELECT table_name FROM information_schema.tables WHERE table_schema != 'information_schema' LIMIT 1 ) AS tables ON tables.table_name = c.table_name",
"Instructions": {
"OperatorType": "Join",
"Variant": "Join",
"JoinColumnIndexes": "R:0",
"JoinVars": {
"tables_table_name": 0
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"TableName": "information_schema.`tables`_information_schema.`columns`",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select `tables`.table_name from (select table_name from information_schema.`tables` where 1 != 1) as `tables` where 1 != 1",
"Query": "select `tables`.table_name from (select table_name from information_schema.`tables` where table_schema != 'information_schema' limit 1) as `tables`",
"Table": "information_schema.`tables`"
},
{
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select c.column_name from information_schema.`columns` as c where 1 != 1",
"Query": "select c.column_name from information_schema.`columns` as c where c.table_name = :c_table_name /* VARCHAR */",
"SysTableTableName": "[c_table_name::tables_table_name]",
"Table": "information_schema.`columns`"
}
]
"FieldQuery": "select c.column_name from (select table_name from information_schema.`tables` where 1 != 1) as `tables`, information_schema.`columns` as c where 1 != 1",
"Query": "select c.column_name from (select table_name from information_schema.`tables` where table_schema != 'information_schema' limit 1) as `tables`, information_schema.`columns` as c where `tables`.table_name = c.table_name",
"SysTableTableName": "[c_table_name::tables_table_name]",
"Table": "information_schema.`columns`, information_schema.`tables`"
}
}
}
Expand Down
52 changes: 15 additions & 37 deletions go/vt/vtgate/planbuilder/testdata/reference_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -168,37 +168,15 @@
"QueryType": "SELECT",
"Original": "SELECT u.id FROM ( SELECT a.id, a.u_id FROM user.ref_with_source AS a WHERE a.id IN (3) ORDER BY a.d_at LIMIT 1) as u LEFT JOIN user.ref_with_source AS u0 ON u.u_id = u0.u_uid ORDER BY u.id",
"Instructions": {
"OperatorType": "Join",
"Variant": "LeftJoin",
"JoinColumnIndexes": "L:0",
"JoinVars": {
"u_u_id": 1
},
"TableName": "ref_with_source_ref_with_source",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Reference",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select u.id, u.u_id from (select a.id, a.u_id from ref_with_source as a where 1 != 1) as u where 1 != 1",
"Query": "select u.id, u.u_id from (select a.id, a.u_id from ref_with_source as a where a.id in (3) order by a.d_at asc limit 1) as u order by u.id asc",
"Table": "ref_with_source"
},
{
"OperatorType": "Route",
"Variant": "Reference",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select 1 from ref_with_source as u0 where 1 != 1",
"Query": "select 1 from ref_with_source as u0 where u0.u_uid = :u_u_id",
"Table": "ref_with_source"
}
]
"OperatorType": "Route",
"Variant": "Reference",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select u.id from (select a.id, a.u_id from ref_with_source as a where 1 != 1) as u left join ref_with_source as u0 on u.u_id = u0.u_uid where 1 != 1",
"Query": "select u.id from (select a.id, a.u_id from ref_with_source as a where a.id in (3) order by a.d_at asc limit 1) as u left join ref_with_source as u0 on u.u_id = u0.u_uid order by u.id asc",
"Table": "ref_with_source"
},
"TablesUsed": [
"user.ref_with_source"
Expand Down Expand Up @@ -700,19 +678,19 @@
"QueryType": "SELECT",
"Original": "select * from user.ref_with_source ref, `user`.`user` u where ref.id = u.ref_id and u.id = 2",
"Instructions": {
"FieldQuery": "select * from ref_with_source as ref, `user` as u where 1 != 1",
"OperatorType": "Route",
"Variant": "EqualUnique",
"Vindex": "user_index",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select * from ref_with_source as ref, `user` as u where 1 != 1",
"Query": "select * from ref_with_source as ref, `user` as u where u.id = 2 and ref.id = u.ref_id",
"Table": "`user`, ref_with_source",
"Values": [
"2"
]
],
"Vindex": "user_index"
},
"TablesUsed": [
"user.ref_with_source",
Expand All @@ -727,19 +705,19 @@
"QueryType": "SELECT",
"Original": "select * from source_of_ref ref, `user`.`user` u where ref.id = u.ref_id and u.id = 2",
"Instructions": {
"FieldQuery": "select * from ref_with_source as ref, `user` as u where 1 != 1",
"OperatorType": "Route",
"Variant": "EqualUnique",
"Vindex": "user_index",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select * from ref_with_source as ref, `user` as u where 1 != 1",
"Query": "select * from ref_with_source as ref, `user` as u where u.id = 2 and ref.id = u.ref_id",
"Table": "`user`, ref_with_source",
"Values": [
"2"
]
],
"Vindex": "user_index"
},
"TablesUsed": [
"user.ref_with_source",
Expand Down

0 comments on commit 3e2dec8

Please sign in to comment.