From 106193a17aa62411fa7abd229a68320b90a1af8d Mon Sep 17 00:00:00 2001 From: ngjaying Date: Mon, 21 Dec 2020 14:45:36 +0800 Subject: [PATCH] feat(sql): join on optimization --- xsql/processors/common_test.go | 5 + xsql/processors/window_rule_test.go | 110 ++++++++--------- xstream/planner/dataSourcePlan.go | 32 ++++- xstream/planner/filterPlan.go | 42 +------ xstream/planner/joinPlan.go | 39 ++++++ xstream/planner/planner_test.go | 185 +++++++++++++++++++++++++++- xstream/planner/util.go | 16 ++- 7 files changed, 328 insertions(+), 101 deletions(-) diff --git a/xsql/processors/common_test.go b/xsql/processors/common_test.go index 8d1ff2e1c0..b0110f1bfe 100644 --- a/xsql/processors/common_test.go +++ b/xsql/processors/common_test.go @@ -102,6 +102,11 @@ func compareMetrics(tp *xstream.TopologyNew, m map[string]interface{}) (err erro if matched { continue } + if common.Config.Basic.Debug == true { + for i, k := range keys { + log.Printf("%s:%v", k, values[i]) + } + } //do not find if index < len(values) { return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", k, v, v, values[index], values[index]) diff --git a/xsql/processors/window_rule_test.go b/xsql/processors/window_rule_test.go index b070d900c6..edfe7de0ba 100644 --- a/xsql/processors/window_rule_test.go +++ b/xsql/processors/window_rule_test.go @@ -102,10 +102,10 @@ func TestWindow(t *testing.T) { "op_1_preprocessor_demo_0_records_in_total": int64(5), "op_1_preprocessor_demo_0_records_out_total": int64(5), - "op_4_project_0_exceptions_total": int64(0), - "op_4_project_0_process_latency_us": int64(0), - "op_4_project_0_records_in_total": int64(2), - "op_4_project_0_records_out_total": int64(2), + "op_3_project_0_exceptions_total": int64(0), + "op_3_project_0_process_latency_us": int64(0), + "op_3_project_0_records_in_total": int64(2), + "op_3_project_0_records_out_total": int64(2), "sink_mockSink_0_exceptions_total": int64(0), "sink_mockSink_0_records_in_total": int64(2), @@ -115,15 +115,15 @@ func TestWindow(t *testing.T) { "source_demo_0_records_in_total": int64(5), "source_demo_0_records_out_total": int64(5), - "op_3_window_0_exceptions_total": int64(0), - "op_3_window_0_process_latency_us": int64(0), - "op_3_window_0_records_in_total": int64(3), - "op_3_window_0_records_out_total": int64(2), + "op_2_window_0_exceptions_total": int64(0), + "op_2_window_0_process_latency_us": int64(0), + "op_2_window_0_records_in_total": int64(3), + "op_2_window_0_records_out_total": int64(2), - "op_2_filter_0_exceptions_total": int64(0), - "op_2_filter_0_process_latency_us": int64(0), - "op_2_filter_0_records_in_total": int64(5), - "op_2_filter_0_records_out_total": int64(3), + "op_2_windowFilter_0_exceptions_total": int64(0), + "op_2_windowFilter_0_process_latency_us": int64(0), + "op_2_windowFilter_0_records_in_total": int64(5), + "op_2_windowFilter_0_records_out_total": int64(3), }, }, { name: `TestWindowRule3`, @@ -474,10 +474,10 @@ func TestWindow(t *testing.T) { "op_1_preprocessor_demo_0_records_in_total": int64(5), "op_1_preprocessor_demo_0_records_out_total": int64(5), - "op_6_project_0_exceptions_total": int64(0), - "op_6_project_0_process_latency_us": int64(0), - "op_6_project_0_records_in_total": int64(1), - "op_6_project_0_records_out_total": int64(1), + "op_5_project_0_exceptions_total": int64(0), + "op_5_project_0_process_latency_us": int64(0), + "op_5_project_0_records_in_total": int64(1), + "op_5_project_0_records_out_total": int64(1), "sink_mockSink_0_exceptions_total": int64(0), "sink_mockSink_0_records_in_total": int64(1), @@ -487,25 +487,25 @@ func TestWindow(t *testing.T) { "source_demo_0_records_in_total": int64(5), "source_demo_0_records_out_total": int64(5), - "op_3_window_0_exceptions_total": int64(0), - "op_3_window_0_process_latency_us": int64(0), - "op_3_window_0_records_in_total": int64(3), - "op_3_window_0_records_out_total": int64(2), - - "op_2_filter_0_exceptions_total": int64(0), - "op_2_filter_0_process_latency_us": int64(0), - "op_2_filter_0_records_in_total": int64(5), - "op_2_filter_0_records_out_total": int64(3), - - "op_4_aggregate_0_exceptions_total": int64(0), - "op_4_aggregate_0_process_latency_us": int64(0), - "op_4_aggregate_0_records_in_total": int64(2), - "op_4_aggregate_0_records_out_total": int64(2), - - "op_5_having_0_exceptions_total": int64(0), - "op_5_having_0_process_latency_us": int64(0), - "op_5_having_0_records_in_total": int64(2), - "op_5_having_0_records_out_total": int64(1), + "op_2_window_0_exceptions_total": int64(0), + "op_2_window_0_process_latency_us": int64(0), + "op_2_window_0_records_in_total": int64(3), + "op_2_window_0_records_out_total": int64(2), + + "op_2_windowFilter_0_exceptions_total": int64(0), + "op_2_windowFilter_0_process_latency_us": int64(0), + "op_2_windowFilter_0_records_in_total": int64(5), + "op_2_windowFilter_0_records_out_total": int64(3), + + "op_3_aggregate_0_exceptions_total": int64(0), + "op_3_aggregate_0_process_latency_us": int64(0), + "op_3_aggregate_0_records_in_total": int64(2), + "op_3_aggregate_0_records_out_total": int64(2), + + "op_4_having_0_exceptions_total": int64(0), + "op_4_having_0_process_latency_us": int64(0), + "op_4_having_0_records_in_total": int64(2), + "op_4_having_0_records_out_total": int64(1), }, }, { name: `TestWindowRule9`, @@ -546,10 +546,10 @@ func TestWindow(t *testing.T) { "op_1_preprocessor_demo_0_records_in_total": int64(5), "op_1_preprocessor_demo_0_records_out_total": int64(5), - "op_4_project_0_exceptions_total": int64(0), - "op_4_project_0_process_latency_us": int64(0), - "op_4_project_0_records_in_total": int64(4), - "op_4_project_0_records_out_total": int64(4), + "op_3_project_0_exceptions_total": int64(0), + "op_3_project_0_process_latency_us": int64(0), + "op_3_project_0_records_in_total": int64(4), + "op_3_project_0_records_out_total": int64(4), "sink_mockSink_0_exceptions_total": int64(0), "sink_mockSink_0_records_in_total": int64(4), @@ -559,10 +559,10 @@ func TestWindow(t *testing.T) { "source_demo_0_records_in_total": int64(5), "source_demo_0_records_out_total": int64(5), - "op_3_window_0_exceptions_total": int64(0), - "op_3_window_0_process_latency_us": int64(0), - "op_3_window_0_records_in_total": int64(3), - "op_3_window_0_records_out_total": int64(4), + "op_2_window_0_exceptions_total": int64(0), + "op_2_window_0_process_latency_us": int64(0), + "op_2_window_0_records_in_total": int64(3), + "op_2_window_0_records_out_total": int64(4), }, }, { name: `TestCountWindowRule1`, @@ -1144,10 +1144,10 @@ func TestWindowError(t *testing.T) { "op_1_preprocessor_ldemo_0_records_in_total": int64(5), "op_1_preprocessor_ldemo_0_records_out_total": int64(5), - "op_4_project_0_exceptions_total": int64(1), - "op_4_project_0_process_latency_us": int64(0), - "op_4_project_0_records_in_total": int64(3), - "op_4_project_0_records_out_total": int64(2), + "op_3_project_0_exceptions_total": int64(1), + "op_3_project_0_process_latency_us": int64(0), + "op_3_project_0_records_in_total": int64(3), + "op_3_project_0_records_out_total": int64(2), "sink_mockSink_0_exceptions_total": int64(0), "sink_mockSink_0_records_in_total": int64(3), @@ -1157,15 +1157,15 @@ func TestWindowError(t *testing.T) { "source_ldemo_0_records_in_total": int64(5), "source_ldemo_0_records_out_total": int64(5), - "op_3_window_0_exceptions_total": int64(1), - "op_3_window_0_process_latency_us": int64(0), - "op_3_window_0_records_in_total": int64(3), - "op_3_window_0_records_out_total": int64(2), + "op_2_window_0_exceptions_total": int64(1), + "op_2_window_0_process_latency_us": int64(0), + "op_2_window_0_records_in_total": int64(3), + "op_2_window_0_records_out_total": int64(2), - "op_2_filter_0_exceptions_total": int64(1), - "op_2_filter_0_process_latency_us": int64(0), - "op_2_filter_0_records_in_total": int64(5), - "op_2_filter_0_records_out_total": int64(2), + "op_2_windowFilter_0_exceptions_total": int64(1), + "op_2_windowFilter_0_process_latency_us": int64(0), + "op_2_windowFilter_0_records_in_total": int64(5), + "op_2_windowFilter_0_records_out_total": int64(2), }, }, { name: `TestWindowErrorRule3`, diff --git a/xstream/planner/dataSourcePlan.go b/xstream/planner/dataSourcePlan.go index 14ddf53203..418a7fba7b 100644 --- a/xstream/planner/dataSourcePlan.go +++ b/xstream/planner/dataSourcePlan.go @@ -20,13 +20,37 @@ func (p DataSourcePlan) Init() *DataSourcePlan { // Presume no children for data source func (p *DataSourcePlan) PushDownPredicate(condition xsql.Expr) (xsql.Expr, LogicalPlan) { - if condition != nil { + owned, other := p.extract(condition) + if owned != nil { // Add a filter plan for children f := FilterPlan{ - condition: condition, + condition: owned, }.Init() f.SetChildren([]LogicalPlan{p}) - return nil, f + return other, f + } + return other, p +} + +func (p *DataSourcePlan) extract(expr xsql.Expr) (xsql.Expr, xsql.Expr) { + s := getRefSources(expr) + switch len(s) { + case 0: + return expr, nil + case 1: + if s[0] == p.name { + return expr, nil + } else { + return nil, expr + } + default: + if be, ok := expr.(*xsql.BinaryExpr); ok && be.OP == xsql.AND { + ul, pl := p.extract(be.LHS) + ur, pr := p.extract(be.RHS) + owned := combine(ul, ur) + other := combine(pl, pr) + return owned, other + } + return nil, expr } - return nil, p } diff --git a/xstream/planner/filterPlan.go b/xstream/planner/filterPlan.go index c6f76b449e..9b258952bd 100644 --- a/xstream/planner/filterPlan.go +++ b/xstream/planner/filterPlan.go @@ -19,14 +19,11 @@ func (p *FilterPlan) PushDownPredicate(condition xsql.Expr) (xsql.Expr, LogicalP p.condition = a return nil, p } - // if has child, try to move pushable condition out - up, pp := extractCondition(a) - rest, _ := p.baseLogicalPlan.PushDownPredicate(pp) + rest, _ := p.baseLogicalPlan.PushDownPredicate(a) - up = combine(up, rest) - if up != nil { - p.condition = up + if rest != nil { + p.condition = rest return nil, p } else if len(p.children) == 1 { // eliminate this filter @@ -35,36 +32,3 @@ func (p *FilterPlan) PushDownPredicate(condition xsql.Expr) (xsql.Expr, LogicalP return nil, p } } - -// Return the unpushable condition and pushable condition -func extractCondition(condition xsql.Expr) (unpushable xsql.Expr, pushable xsql.Expr) { - s := GetRefSources(condition) - if len(s) < 2 { - pushable = condition - return - } else { - if be, ok := condition.(*xsql.BinaryExpr); ok && be.OP == xsql.AND { - ul, pl := extractCondition(be.LHS) - ur, pr := extractCondition(be.RHS) - unpushable = combine(ul, ur) - pushable = combine(pl, pr) - return - } - } - //default case: all condition are unpushable - return condition, nil -} - -func combine(l xsql.Expr, r xsql.Expr) xsql.Expr { - if l != nil && r != nil { - return &xsql.BinaryExpr{ - OP: xsql.AND, - LHS: l, - RHS: r, - } - } else if l != nil { - return l - } else { - return r - } -} diff --git a/xstream/planner/joinPlan.go b/xstream/planner/joinPlan.go index dd3afd65dc..8712e54a28 100644 --- a/xstream/planner/joinPlan.go +++ b/xstream/planner/joinPlan.go @@ -12,3 +12,42 @@ func (p JoinPlan) Init() *JoinPlan { p.baseLogicalPlan.self = &p return &p } + +func (p *JoinPlan) PushDownPredicate(condition xsql.Expr) (xsql.Expr, LogicalPlan) { + //TODO multiple join support + //Assume only one join + j := p.joins[0] + switch j.JoinType { + case xsql.INNER_JOIN: + a := combine(condition, j.Expr) + multipleSourcesCondition, singleSourceCondition := extractCondition(a) + rest, _ := p.baseLogicalPlan.PushDownPredicate(singleSourceCondition) + j.Expr = combine(multipleSourcesCondition, rest) //always swallow all conditions + p.joins[0] = j + return nil, p + default: //TODO fine grain handling for left/right join + multipleSourcesCondition, singleSourceCondition := extractCondition(condition) + rest, _ := p.baseLogicalPlan.PushDownPredicate(singleSourceCondition) + // never swallow anything + return combine(multipleSourcesCondition, rest), p + } +} + +// Return the unpushable condition and pushable condition +func extractCondition(condition xsql.Expr) (unpushable xsql.Expr, pushable xsql.Expr) { + s := getRefSources(condition) + if len(s) < 2 { + pushable = condition + return + } else { + if be, ok := condition.(*xsql.BinaryExpr); ok && be.OP == xsql.AND { + ul, pl := extractCondition(be.LHS) + ur, pr := extractCondition(be.RHS) + unpushable = combine(ul, ur) + pushable = combine(pl, pr) + return + } + } + //default case: all condition are unpushable + return condition, nil +} diff --git a/xstream/planner/planner_test.go b/xstream/planner/planner_test.go index 0379d497af..dee40b1662 100644 --- a/xstream/planner/planner_test.go +++ b/xstream/planner/planner_test.go @@ -271,15 +271,196 @@ func Test_createLogicalPlan(t *testing.T) { isAggregate: false, sendMeta: false, }.Init(), + }, { // 5. optimize join on + sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`, + p: ProjectPlan{ + baseLogicalPlan: baseLogicalPlan{ + children: []LogicalPlan{ + JoinPlan{ + baseLogicalPlan: baseLogicalPlan{ + children: []LogicalPlan{ + WindowPlan{ + baseLogicalPlan: baseLogicalPlan{ + children: []LogicalPlan{ + FilterPlan{ + baseLogicalPlan: baseLogicalPlan{ + children: []LogicalPlan{ + DataSourcePlan{ + name: "src1", + isWildCard: true, + needMeta: false, + fields: nil, + metaFields: nil, + }.Init(), + }, + }, + condition: &xsql.BinaryExpr{ + RHS: &xsql.BinaryExpr{ + OP: xsql.GT, + LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"}, + RHS: &xsql.IntegerLiteral{Val: 20}, + }, + OP: xsql.AND, + LHS: &xsql.BinaryExpr{ + OP: xsql.GT, + LHS: &xsql.FieldRef{Name: "id", StreamName: "src1"}, + RHS: &xsql.IntegerLiteral{Val: 111}, + }, + }, + }.Init(), + FilterPlan{ + baseLogicalPlan: baseLogicalPlan{ + children: []LogicalPlan{ + DataSourcePlan{ + name: "src2", + isWildCard: true, + needMeta: false, + fields: nil, + metaFields: nil, + }.Init(), + }, + }, + condition: &xsql.BinaryExpr{ + OP: xsql.LT, + LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"}, + RHS: &xsql.IntegerLiteral{Val: 60}, + }, + }.Init(), + }, + }, + condition: nil, + wtype: xsql.TUMBLING_WINDOW, + length: 10000, + interval: 0, + limit: 0, + }.Init(), + }, + }, + from: &xsql.Table{ + Name: "src1", + }, + joins: []xsql.Join{ + { + Name: "src2", + Alias: "", + JoinType: xsql.INNER_JOIN, + Expr: &xsql.BinaryExpr{ + LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"}, + OP: xsql.EQ, + RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"}, + }, + }, + }, + }.Init(), + }, + }, + fields: []xsql.Field{ + { + Expr: &xsql.FieldRef{Name: "id1"}, + Name: "id1", + AName: ""}, + }, + isAggregate: false, + sendMeta: false, + }.Init(), + }, { // 6. optimize outter join on + sql: `SELECT id1 FROM src1 FULL JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`, + p: ProjectPlan{ + baseLogicalPlan: baseLogicalPlan{ + children: []LogicalPlan{ + JoinPlan{ + baseLogicalPlan: baseLogicalPlan{ + children: []LogicalPlan{ + WindowPlan{ + baseLogicalPlan: baseLogicalPlan{ + children: []LogicalPlan{ + FilterPlan{ + baseLogicalPlan: baseLogicalPlan{ + children: []LogicalPlan{ + DataSourcePlan{ + name: "src1", + isWildCard: true, + needMeta: false, + fields: nil, + metaFields: nil, + }.Init(), + }, + }, + condition: &xsql.BinaryExpr{ + OP: xsql.GT, + LHS: &xsql.FieldRef{Name: "id", StreamName: "src1"}, + RHS: &xsql.IntegerLiteral{Val: 111}, + }, + }.Init(), + DataSourcePlan{ + name: "src2", + isWildCard: true, + needMeta: false, + fields: nil, + metaFields: nil, + }.Init(), + }, + }, + condition: nil, + wtype: xsql.TUMBLING_WINDOW, + length: 10000, + interval: 0, + limit: 0, + }.Init(), + }, + }, + from: &xsql.Table{ + Name: "src1", + }, + joins: []xsql.Join{ + { + Name: "src2", + Alias: "", + JoinType: xsql.FULL_JOIN, + Expr: &xsql.BinaryExpr{ + OP: xsql.AND, + LHS: &xsql.BinaryExpr{ + OP: xsql.AND, + LHS: &xsql.BinaryExpr{ + LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"}, + OP: xsql.EQ, + RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"}, + }, + RHS: &xsql.BinaryExpr{ + OP: xsql.GT, + LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"}, + RHS: &xsql.IntegerLiteral{Val: 20}, + }, + }, + RHS: &xsql.BinaryExpr{ + OP: xsql.LT, + LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"}, + RHS: &xsql.IntegerLiteral{Val: 60}, + }, + }, + }, + }, + }.Init(), + }, + }, + fields: []xsql.Field{ + { + Expr: &xsql.FieldRef{Name: "id1"}, + Name: "id1", + AName: ""}, + }, + isAggregate: false, + sendMeta: false, + }.Init(), }, } //TODO optimize having, optimize on fmt.Printf("The test bucket size is %d.\n\n", len(tests)) - for i, tt := range tests { - //fmt.Printf("Parsing SQL %q.\n", tt.s) + for i, tt := range tests[6:] { stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse() if err != nil { t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err) + continue } p, err := createLogicalPlan(stmt, &api.RuleOption{ IsEventTime: false, diff --git a/xstream/planner/util.go b/xstream/planner/util.go index 11ce437030..d59347a785 100644 --- a/xstream/planner/util.go +++ b/xstream/planner/util.go @@ -2,7 +2,7 @@ package planner import "github.com/emqx/kuiper/xsql" -func GetRefSources(node xsql.Node) []string { +func getRefSources(node xsql.Node) []string { result := make(map[string]bool) keys := make([]string, 0, len(result)) if node == nil { @@ -18,3 +18,17 @@ func GetRefSources(node xsql.Node) []string { } return keys } + +func combine(l xsql.Expr, r xsql.Expr) xsql.Expr { + if l != nil && r != nil { + return &xsql.BinaryExpr{ + OP: xsql.AND, + LHS: l, + RHS: r, + } + } else if l != nil { + return l + } else { + return r + } +}