From c14dbd254eccf79b030d4373e1d3896aabc340d1 Mon Sep 17 00:00:00 2001 From: ngjaying Date: Mon, 28 Dec 2020 18:06:22 +0800 Subject: [PATCH] feat(sql): column pruning --- xsql/ast.go | 1 + xsql/ast_agg_stmt_test.go | 2 +- xsql/functions.go | 21 +- xsql/processors/rule_test.go | 59 +---- xsql/processors/window_rule_test.go | 110 ++++---- xstream/operators/preprocessor.go | 59 +++-- xstream/operators/preprocessor_test.go | 31 ++- xstream/planner/aggregatePlan.go | 5 + xstream/planner/dataSourcePlan.go | 151 ++++++++++- xstream/planner/filterPlan.go | 5 + xstream/planner/havingPlan.go | 5 + xstream/planner/joinPlan.go | 5 + xstream/planner/logicalPlan.go | 12 + xstream/planner/optimizer.go | 1 + xstream/planner/orderPlan.go | 5 + xstream/planner/planner.go | 39 ++- xstream/planner/planner_test.go | 332 +++++++++++++++++-------- xstream/planner/projectPlan.go | 5 + xstream/planner/rules.go | 11 + xstream/planner/util.go | 17 ++ xstream/planner/windowPlan.go | 5 + 21 files changed, 598 insertions(+), 283 deletions(-) diff --git a/xsql/ast.go b/xsql/ast.go index a760d73bc0..bc0d107cfa 100644 --- a/xsql/ast.go +++ b/xsql/ast.go @@ -421,6 +421,7 @@ func Walk(v Visitor, node Node) { Walk(v, n.Joins) Walk(v, n.Condition) Walk(v, n.SortFields) + Walk(v, n.Having) case SortFields: for _, sf := range n { diff --git a/xsql/ast_agg_stmt_test.go b/xsql/ast_agg_stmt_test.go index 8939691f4f..67aea03fb1 100644 --- a/xsql/ast_agg_stmt_test.go +++ b/xsql/ast_agg_stmt_test.go @@ -24,7 +24,7 @@ func TestIsAggStatement(t *testing.T) { {s: `SELECT max(f1) FROM tbl`, agg: true}, {s: `SELECT min(f1) FROM tbl`, agg: true}, {s: `SELECT count(f1) FROM tbl group by tumblingwindow(ss, 5)`, agg: true}, - + {s: `SELECT f1 FROM tbl group by tumblingwindow(ss, 5) having count(f1) > 3`, agg: false}, {s: `SELECT f1 FROM tbl left join tbl2 on tbl1.f1 = tbl2.f2`, agg: false}, } diff --git a/xsql/functions.go b/xsql/functions.go index af0d4479b5..110b1f194d 100644 --- a/xsql/functions.go +++ b/xsql/functions.go @@ -112,20 +112,21 @@ func (fv *FunctionValuer) Call(name string, args []interface{}) (interface{}, bo } } -func IsAggStatement(node Node) bool { - var r = false - WalkFunc(node, func(n Node) { - if f, ok := n.(*Call); ok { +func IsAggStatement(stmt *SelectStatement) bool { + if stmt.Dimensions != nil { + ds := stmt.Dimensions.GetGroups() + if ds != nil && len(ds) > 0 { + return true + } + } + r := false + WalkFunc(stmt.Fields, func(n Node) { + switch f := n.(type) { + case *Call: if ok := isAggFunc(f); ok { r = true return } - } else if d, ok := n.(Dimensions); ok { - ds := d.GetGroups() - if ds != nil && len(ds) > 0 { - r = true - return - } } }) return r diff --git a/xsql/processors/rule_test.go b/xsql/processors/rule_test.go index 7047c72a1b..0605dab0d8 100644 --- a/xsql/processors/rule_test.go +++ b/xsql/processors/rule_test.go @@ -149,71 +149,30 @@ func TestSingleSQL(t *testing.T) { sql: `SELECT size as Int8, ts FROM demoError where size > 3`, r: [][]map[string]interface{}{ {{ - "error": "error in preprocessor: invalid data type for color, expect string but found int(3)", + "error": "error in preprocessor: invalid data type for size, expect bigint but found string(red)", }}, {{ "Int8": float64(6), "ts": float64(1541152486822), }}, {{ - "error": "error in preprocessor: invalid data type for color, expect string but found int(7)", - }}, - {{ - "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)", - }}, - }, - m: map[string]interface{}{ - "op_1_preprocessor_demoError_0_exceptions_total": int64(3), - "op_1_preprocessor_demoError_0_process_latency_us": int64(0), - "op_1_preprocessor_demoError_0_records_in_total": int64(5), - "op_1_preprocessor_demoError_0_records_out_total": int64(2), - - "op_3_project_0_exceptions_total": int64(3), - "op_3_project_0_process_latency_us": int64(0), - "op_3_project_0_records_in_total": int64(4), - "op_3_project_0_records_out_total": int64(1), - - "sink_mockSink_0_exceptions_total": int64(0), - "sink_mockSink_0_records_in_total": int64(4), - "sink_mockSink_0_records_out_total": int64(4), - - "source_demoError_0_exceptions_total": int64(0), - "source_demoError_0_records_in_total": int64(5), - "source_demoError_0_records_out_total": int64(5), - - "op_2_filter_0_exceptions_total": int64(3), - "op_2_filter_0_process_latency_us": int64(0), - "op_2_filter_0_records_in_total": int64(5), - "op_2_filter_0_records_out_total": int64(1), - }, - }, { - name: `TestSingleSQLRule4`, - sql: `SELECT size as Int8, ts FROM demoError where size > 3`, - r: [][]map[string]interface{}{ - {{ - "error": "error in preprocessor: invalid data type for color, expect string but found int(3)", - }}, - {{ - "Int8": float64(6), - "ts": float64(1541152486822), - }}, - {{ - "error": "error in preprocessor: invalid data type for color, expect string but found int(7)", + "Int8": float64(4), + "ts": float64(1541152488442), }}, {{ "error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)", }}, }, m: map[string]interface{}{ - "op_1_preprocessor_demoError_0_exceptions_total": int64(3), + "op_1_preprocessor_demoError_0_exceptions_total": int64(2), "op_1_preprocessor_demoError_0_process_latency_us": int64(0), "op_1_preprocessor_demoError_0_records_in_total": int64(5), - "op_1_preprocessor_demoError_0_records_out_total": int64(2), + "op_1_preprocessor_demoError_0_records_out_total": int64(3), - "op_3_project_0_exceptions_total": int64(3), + "op_3_project_0_exceptions_total": int64(2), "op_3_project_0_process_latency_us": int64(0), "op_3_project_0_records_in_total": int64(4), - "op_3_project_0_records_out_total": int64(1), + "op_3_project_0_records_out_total": int64(2), "sink_mockSink_0_exceptions_total": int64(0), "sink_mockSink_0_records_in_total": int64(4), @@ -223,10 +182,10 @@ func TestSingleSQL(t *testing.T) { "source_demoError_0_records_in_total": int64(5), "source_demoError_0_records_out_total": int64(5), - "op_2_filter_0_exceptions_total": int64(3), + "op_2_filter_0_exceptions_total": int64(2), "op_2_filter_0_process_latency_us": int64(0), "op_2_filter_0_records_in_total": int64(5), - "op_2_filter_0_records_out_total": int64(1), + "op_2_filter_0_records_out_total": int64(2), }, }, { name: `TestSingleSQLRule5`, diff --git a/xsql/processors/window_rule_test.go b/xsql/processors/window_rule_test.go index edfe7de0ba..b070d900c6 100644 --- a/xsql/processors/window_rule_test.go +++ b/xsql/processors/window_rule_test.go @@ -102,10 +102,10 @@ func TestWindow(t *testing.T) { "op_1_preprocessor_demo_0_records_in_total": int64(5), "op_1_preprocessor_demo_0_records_out_total": int64(5), - "op_3_project_0_exceptions_total": int64(0), - "op_3_project_0_process_latency_us": int64(0), - "op_3_project_0_records_in_total": int64(2), - "op_3_project_0_records_out_total": int64(2), + "op_4_project_0_exceptions_total": int64(0), + "op_4_project_0_process_latency_us": int64(0), + "op_4_project_0_records_in_total": int64(2), + "op_4_project_0_records_out_total": int64(2), "sink_mockSink_0_exceptions_total": int64(0), "sink_mockSink_0_records_in_total": int64(2), @@ -115,15 +115,15 @@ func TestWindow(t *testing.T) { "source_demo_0_records_in_total": int64(5), "source_demo_0_records_out_total": int64(5), - "op_2_window_0_exceptions_total": int64(0), - "op_2_window_0_process_latency_us": int64(0), - "op_2_window_0_records_in_total": int64(3), - "op_2_window_0_records_out_total": int64(2), + "op_3_window_0_exceptions_total": int64(0), + "op_3_window_0_process_latency_us": int64(0), + "op_3_window_0_records_in_total": int64(3), + "op_3_window_0_records_out_total": int64(2), - "op_2_windowFilter_0_exceptions_total": int64(0), - "op_2_windowFilter_0_process_latency_us": int64(0), - "op_2_windowFilter_0_records_in_total": int64(5), - "op_2_windowFilter_0_records_out_total": int64(3), + "op_2_filter_0_exceptions_total": int64(0), + "op_2_filter_0_process_latency_us": int64(0), + "op_2_filter_0_records_in_total": int64(5), + "op_2_filter_0_records_out_total": int64(3), }, }, { name: `TestWindowRule3`, @@ -474,10 +474,10 @@ func TestWindow(t *testing.T) { "op_1_preprocessor_demo_0_records_in_total": int64(5), "op_1_preprocessor_demo_0_records_out_total": int64(5), - "op_5_project_0_exceptions_total": int64(0), - "op_5_project_0_process_latency_us": int64(0), - "op_5_project_0_records_in_total": int64(1), - "op_5_project_0_records_out_total": int64(1), + "op_6_project_0_exceptions_total": int64(0), + "op_6_project_0_process_latency_us": int64(0), + "op_6_project_0_records_in_total": int64(1), + "op_6_project_0_records_out_total": int64(1), "sink_mockSink_0_exceptions_total": int64(0), "sink_mockSink_0_records_in_total": int64(1), @@ -487,25 +487,25 @@ func TestWindow(t *testing.T) { "source_demo_0_records_in_total": int64(5), "source_demo_0_records_out_total": int64(5), - "op_2_window_0_exceptions_total": int64(0), - "op_2_window_0_process_latency_us": int64(0), - "op_2_window_0_records_in_total": int64(3), - "op_2_window_0_records_out_total": int64(2), - - "op_2_windowFilter_0_exceptions_total": int64(0), - "op_2_windowFilter_0_process_latency_us": int64(0), - "op_2_windowFilter_0_records_in_total": int64(5), - "op_2_windowFilter_0_records_out_total": int64(3), - - "op_3_aggregate_0_exceptions_total": int64(0), - "op_3_aggregate_0_process_latency_us": int64(0), - "op_3_aggregate_0_records_in_total": int64(2), - "op_3_aggregate_0_records_out_total": int64(2), - - "op_4_having_0_exceptions_total": int64(0), - "op_4_having_0_process_latency_us": int64(0), - "op_4_having_0_records_in_total": int64(2), - "op_4_having_0_records_out_total": int64(1), + "op_3_window_0_exceptions_total": int64(0), + "op_3_window_0_process_latency_us": int64(0), + "op_3_window_0_records_in_total": int64(3), + "op_3_window_0_records_out_total": int64(2), + + "op_2_filter_0_exceptions_total": int64(0), + "op_2_filter_0_process_latency_us": int64(0), + "op_2_filter_0_records_in_total": int64(5), + "op_2_filter_0_records_out_total": int64(3), + + "op_4_aggregate_0_exceptions_total": int64(0), + "op_4_aggregate_0_process_latency_us": int64(0), + "op_4_aggregate_0_records_in_total": int64(2), + "op_4_aggregate_0_records_out_total": int64(2), + + "op_5_having_0_exceptions_total": int64(0), + "op_5_having_0_process_latency_us": int64(0), + "op_5_having_0_records_in_total": int64(2), + "op_5_having_0_records_out_total": int64(1), }, }, { name: `TestWindowRule9`, @@ -546,10 +546,10 @@ func TestWindow(t *testing.T) { "op_1_preprocessor_demo_0_records_in_total": int64(5), "op_1_preprocessor_demo_0_records_out_total": int64(5), - "op_3_project_0_exceptions_total": int64(0), - "op_3_project_0_process_latency_us": int64(0), - "op_3_project_0_records_in_total": int64(4), - "op_3_project_0_records_out_total": int64(4), + "op_4_project_0_exceptions_total": int64(0), + "op_4_project_0_process_latency_us": int64(0), + "op_4_project_0_records_in_total": int64(4), + "op_4_project_0_records_out_total": int64(4), "sink_mockSink_0_exceptions_total": int64(0), "sink_mockSink_0_records_in_total": int64(4), @@ -559,10 +559,10 @@ func TestWindow(t *testing.T) { "source_demo_0_records_in_total": int64(5), "source_demo_0_records_out_total": int64(5), - "op_2_window_0_exceptions_total": int64(0), - "op_2_window_0_process_latency_us": int64(0), - "op_2_window_0_records_in_total": int64(3), - "op_2_window_0_records_out_total": int64(4), + "op_3_window_0_exceptions_total": int64(0), + "op_3_window_0_process_latency_us": int64(0), + "op_3_window_0_records_in_total": int64(3), + "op_3_window_0_records_out_total": int64(4), }, }, { name: `TestCountWindowRule1`, @@ -1144,10 +1144,10 @@ func TestWindowError(t *testing.T) { "op_1_preprocessor_ldemo_0_records_in_total": int64(5), "op_1_preprocessor_ldemo_0_records_out_total": int64(5), - "op_3_project_0_exceptions_total": int64(1), - "op_3_project_0_process_latency_us": int64(0), - "op_3_project_0_records_in_total": int64(3), - "op_3_project_0_records_out_total": int64(2), + "op_4_project_0_exceptions_total": int64(1), + "op_4_project_0_process_latency_us": int64(0), + "op_4_project_0_records_in_total": int64(3), + "op_4_project_0_records_out_total": int64(2), "sink_mockSink_0_exceptions_total": int64(0), "sink_mockSink_0_records_in_total": int64(3), @@ -1157,15 +1157,15 @@ func TestWindowError(t *testing.T) { "source_ldemo_0_records_in_total": int64(5), "source_ldemo_0_records_out_total": int64(5), - "op_2_window_0_exceptions_total": int64(1), - "op_2_window_0_process_latency_us": int64(0), - "op_2_window_0_records_in_total": int64(3), - "op_2_window_0_records_out_total": int64(2), + "op_3_window_0_exceptions_total": int64(1), + "op_3_window_0_process_latency_us": int64(0), + "op_3_window_0_records_in_total": int64(3), + "op_3_window_0_records_out_total": int64(2), - "op_2_windowFilter_0_exceptions_total": int64(1), - "op_2_windowFilter_0_process_latency_us": int64(0), - "op_2_windowFilter_0_records_in_total": int64(5), - "op_2_windowFilter_0_records_out_total": int64(2), + "op_2_filter_0_exceptions_total": int64(1), + "op_2_filter_0_process_latency_us": int64(0), + "op_2_filter_0_records_in_total": int64(5), + "op_2_filter_0_records_out_total": int64(2), }, }, { name: `TestWindowErrorRule3`, diff --git a/xstream/operators/preprocessor.go b/xstream/operators/preprocessor.go index 3d70ef6fa9..5a5069c733 100644 --- a/xstream/operators/preprocessor.go +++ b/xstream/operators/preprocessor.go @@ -15,26 +15,19 @@ import ( ) type Preprocessor struct { - streamStmt *xsql.StreamStmt + //Pruned stream fields. Could be streamField(with data type info) or string + streamFields []interface{} aliasFields xsql.Fields + allMeta bool + metaFields []string //only needed if not allMeta isEventTime bool timestampField string timestampFormat string isBinary bool } -func NewPreprocessor(s *xsql.StreamStmt, fs xsql.Fields, iet bool, isBinary bool) (*Preprocessor, error) { - p := &Preprocessor{streamStmt: s, aliasFields: fs, isEventTime: iet, isBinary: isBinary} - if iet { - if tf, ok := s.Options["TIMESTAMP"]; ok { - p.timestampField = tf - } else { - return nil, fmt.Errorf("preprocessor is set to be event time but stream option TIMESTAMP not found") - } - if ts, ok := s.Options["TIMESTAMP_FORMAT"]; ok { - p.timestampFormat = ts - } - } +func NewPreprocessor(fields []interface{}, fs xsql.Fields, allMeta bool, metaFields []string, iet bool, timestampField string, timestampFormat string, isBinary bool) (*Preprocessor, error) { + p := &Preprocessor{streamFields: fields, aliasFields: fs, allMeta: allMeta, metaFields: metaFields, isEventTime: iet, isBinary: isBinary, timestampFormat: timestampFormat, timestampField: timestampField} return p, nil } @@ -52,18 +45,27 @@ func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.F log.Debugf("preprocessor receive %s", tuple.Message) result := make(map[string]interface{}) - if p.streamStmt.StreamFields != nil { - if p.isBinary { - f := p.streamStmt.StreamFields[0] - tuple.Message[f.Name] = tuple.Message[common.DEFAULT_FIELD] - if e := p.addRecField(f.FieldType, result, tuple.Message, f.Name); e != nil { - return fmt.Errorf("error in preprocessor: %s", e) - } - } else { - for _, f := range p.streamStmt.StreamFields { - if e := p.addRecField(f.FieldType, result, tuple.Message, f.Name); e != nil { + if p.streamFields != nil { + for _, f := range p.streamFields { + switch sf := f.(type) { + case *xsql.StreamField: + if p.isBinary { + tuple.Message[sf.Name] = tuple.Message[common.DEFAULT_FIELD] + } + if e := p.addRecField(sf.FieldType, result, tuple.Message, sf.Name); e != nil { return fmt.Errorf("error in preprocessor: %s", e) } + case string: //schemaless + if p.isBinary { + result = tuple.Message + } else { + if m, ok := tuple.Message.Value(sf); ok { + result[sf] = m + } + } + } + if p.isBinary { + break //binary format should only have ONE field } } } else { @@ -95,12 +97,19 @@ func (p *Preprocessor) Apply(ctx api.StreamContext, data interface{}, fv *xsql.F return fmt.Errorf("cannot find timestamp field %s in tuple %v", p.timestampField, result) } } + if !p.allMeta && p.metaFields != nil && len(p.metaFields) > 0 { + newMeta := make(xsql.Metadata) + for _, f := range p.metaFields { + newMeta[f] = tuple.Metadata[f] + } + tuple.Metadata = newMeta + } return tuple } func (p *Preprocessor) parseTime(s string) (time.Time, error) { - if f, ok := p.streamStmt.Options["TIMESTAMP_FORMAT"]; ok { - return common.ParseTime(s, f) + if p.timestampFormat != "" { + return common.ParseTime(s, p.timestampFormat) } else { return time.Parse(common.JSISO, s) } diff --git a/xstream/operators/preprocessor_test.go b/xstream/operators/preprocessor_test.go index 1c047d3046..0b6ca7cfec 100644 --- a/xstream/operators/preprocessor_test.go +++ b/xstream/operators/preprocessor_test.go @@ -525,7 +525,7 @@ func TestPreprocessor_Apply(t *testing.T) { contextLogger := common.Log.WithField("rule", "TestPreprocessor_Apply") ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger) for i, tt := range tests { - pp := &Preprocessor{streamStmt: tt.stmt} + pp := &Preprocessor{streamFields: convertFields(tt.stmt.StreamFields)} dm := make(map[string]interface{}) if e := json.Unmarshal(tt.data, &dm); e != nil { @@ -658,9 +658,7 @@ func TestPreprocessorTime_Apply(t *testing.T) { contextLogger := common.Log.WithField("rule", "TestPreprocessorTime_Apply") ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger) for i, tt := range tests { - - pp := &Preprocessor{streamStmt: tt.stmt} - + pp := &Preprocessor{streamFields: convertFields(tt.stmt.StreamFields), timestampFormat: tt.stmt.Options["TIMESTAMP_FORMAT"]} dm := make(map[string]interface{}) if e := json.Unmarshal(tt.data, &dm); e != nil { log.Fatal(e) @@ -683,6 +681,17 @@ func TestPreprocessorTime_Apply(t *testing.T) { } } +func convertFields(o xsql.StreamFields) []interface{} { + if o == nil { + return nil + } + fields := make([]interface{}, len(o)) + for i, _ := range o { + fields[i] = &o[i] + } + return fields +} + func TestPreprocessorEventtime_Apply(t *testing.T) { var tests = []struct { @@ -830,9 +839,13 @@ func TestPreprocessorEventtime_Apply(t *testing.T) { ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger) for i, tt := range tests { - pp, err := NewPreprocessor(tt.stmt, nil, true, false) - if err != nil { - t.Error(err) + pp := &Preprocessor{ + streamFields: convertFields(tt.stmt.StreamFields), + aliasFields: nil, + isEventTime: true, + timestampField: tt.stmt.Options["TIMESTAMP"], + timestampFormat: tt.stmt.Options["TIMESTAMP_FORMAT"], + isBinary: false, } dm := make(map[string]interface{}) @@ -912,7 +925,7 @@ func TestPreprocessorError(t *testing.T) { ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger) for i, tt := range tests { - pp := &Preprocessor{streamStmt: tt.stmt} + pp := &Preprocessor{streamFields: convertFields(tt.stmt.StreamFields)} dm := make(map[string]interface{}) if e := json.Unmarshal(tt.data, &dm); e != nil { @@ -1039,7 +1052,7 @@ func TestPreprocessorForBinary(t *testing.T) { contextLogger := common.Log.WithField("rule", "TestPreprocessorForBinary") ctx := contexts.WithValue(contexts.Background(), contexts.LoggerKey, contextLogger) for i, tt := range tests { - pp := &Preprocessor{streamStmt: tt.stmt, isBinary: tt.isBinary} + pp := &Preprocessor{streamFields: convertFields(tt.stmt.StreamFields), isBinary: tt.isBinary} format := "json" if tt.isBinary { format = "binary" diff --git a/xstream/planner/aggregatePlan.go b/xstream/planner/aggregatePlan.go index eea76cb9a8..1bae1c2aee 100644 --- a/xstream/planner/aggregatePlan.go +++ b/xstream/planner/aggregatePlan.go @@ -12,3 +12,8 @@ func (p AggregatePlan) Init() *AggregatePlan { p.baseLogicalPlan.self = &p return &p } + +func (p *AggregatePlan) PruneColumns(fields []xsql.Expr) error { + f := getFields(p.dimensions) + return p.baseLogicalPlan.PruneColumns(append(fields, f...)) +} diff --git a/xstream/planner/dataSourcePlan.go b/xstream/planner/dataSourcePlan.go index 418a7fba7b..5bcbec2333 100644 --- a/xstream/planner/dataSourcePlan.go +++ b/xstream/planner/dataSourcePlan.go @@ -1,16 +1,32 @@ package planner -import "github.com/emqx/kuiper/xsql" +import ( + "fmt" + "github.com/emqx/kuiper/common" + "github.com/emqx/kuiper/xsql" + "sort" + "strings" +) type DataSourcePlan struct { baseLogicalPlan - name string + name string + // calculated properties + // initialized with stream definition, pruned with rule + streamFields []interface{} + metaFields []string + // passon properties + streamStmt *xsql.StreamStmt + alias xsql.Fields + allMeta bool + isBinary bool + iet bool + timestampFormat string + timestampField string + // intermediate status isWildCard bool - needMeta bool - // if is wildCard, leave it empty - fields xsql.Fields - metaFields xsql.Fields - alias xsql.Fields + fields map[string]interface{} + metaMap map[string]bool } func (p DataSourcePlan) Init() *DataSourcePlan { @@ -38,7 +54,7 @@ func (p *DataSourcePlan) extract(expr xsql.Expr) (xsql.Expr, xsql.Expr) { case 0: return expr, nil case 1: - if s[0] == p.name { + if s[0] == p.name || s[0] == "" { return expr, nil } else { return nil, expr @@ -54,3 +70,122 @@ func (p *DataSourcePlan) extract(expr xsql.Expr) (xsql.Expr, xsql.Expr) { return nil, expr } } + +func (p *DataSourcePlan) PruneColumns(fields []xsql.Expr) error { + //init values + p.getProps() + p.fields = make(map[string]interface{}) + if !p.allMeta { + p.metaMap = make(map[string]bool) + } + if p.timestampField != "" { + p.fields[p.timestampField] = p.timestampField + } + for _, field := range fields { + switch f := field.(type) { + case *xsql.Wildcard: + p.isWildCard = true + case *xsql.FieldRef: + if !p.isWildCard && (f.StreamName == "" || string(f.StreamName) == p.name) { + if _, ok := p.fields[f.Name]; !ok { + sf := p.getField(f.Name) + if sf != nil { + p.fields[f.Name] = sf + } + } + } + case *xsql.MetaRef: + if p.allMeta { + break + } + if f.StreamName == "" || string(f.StreamName) == p.name { + if f.Name == "*" { + p.allMeta = true + p.metaMap = nil + } else if !p.allMeta { + p.metaMap[f.Name] = true + } + } + case *xsql.SortField: + if !p.isWildCard { + sf := p.getField(f.Name) + if sf != nil { + p.fields[f.Name] = sf + } + } + default: + return fmt.Errorf("unsupported field %v", field) + } + } + p.getAllFields() + return nil +} + +func (p *DataSourcePlan) getField(name string) interface{} { + if p.streamStmt.StreamFields != nil { + for _, f := range p.streamStmt.StreamFields { // The input can only be StreamFields + if f.Name == name { + return &f + } + } + } else { + return name + } + return nil +} + +func (p *DataSourcePlan) getAllFields() { + // convert fields + p.streamFields = make([]interface{}, 0) + if p.isWildCard { + if p.streamStmt.StreamFields != nil { + for k, _ := range p.streamStmt.StreamFields { // The input can only be StreamFields + p.streamFields = append(p.streamFields, &p.streamStmt.StreamFields[k]) + } + } else { + p.streamFields = nil + } + } else { + sfs := make([]interface{}, 0, len(p.fields)) + if common.IsTesting { + var keys []string + for k, _ := range p.fields { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + sfs = append(sfs, p.fields[k]) + } + } else { + for _, v := range p.fields { + sfs = append(sfs, v) + } + } + p.streamFields = sfs + } + p.metaFields = make([]string, 0, len(p.metaMap)) + for k, _ := range p.metaMap { + p.metaFields = append(p.metaFields, k) + } + p.fields = nil + p.metaMap = nil +} + +func (p *DataSourcePlan) getProps() error { + if p.iet { + if tf, ok := p.streamStmt.Options["TIMESTAMP"]; ok { + p.timestampField = tf + } else { + return fmt.Errorf("preprocessor is set to be event time but stream option TIMESTAMP not found") + } + if ts, ok := p.streamStmt.Options["TIMESTAMP_FORMAT"]; ok { + p.timestampFormat = ts + } + } + if f, ok := p.streamStmt.Options["FORMAT"]; ok { + if strings.ToLower(f) == common.FORMAT_BINARY { + p.isBinary = true + } + } + return nil +} diff --git a/xstream/planner/filterPlan.go b/xstream/planner/filterPlan.go index 9b258952bd..3b036ac41b 100644 --- a/xstream/planner/filterPlan.go +++ b/xstream/planner/filterPlan.go @@ -32,3 +32,8 @@ func (p *FilterPlan) PushDownPredicate(condition xsql.Expr) (xsql.Expr, LogicalP return nil, p } } + +func (p *FilterPlan) PruneColumns(fields []xsql.Expr) error { + f := getFields(p.condition) + return p.baseLogicalPlan.PruneColumns(append(fields, f...)) +} diff --git a/xstream/planner/havingPlan.go b/xstream/planner/havingPlan.go index 2e6c29fb89..c26b02128e 100644 --- a/xstream/planner/havingPlan.go +++ b/xstream/planner/havingPlan.go @@ -11,3 +11,8 @@ func (p HavingPlan) Init() *HavingPlan { p.baseLogicalPlan.self = &p return &p } + +func (p *HavingPlan) PruneColumns(fields []xsql.Expr) error { + f := getFields(p.condition) + return p.baseLogicalPlan.PruneColumns(append(fields, f...)) +} diff --git a/xstream/planner/joinPlan.go b/xstream/planner/joinPlan.go index 8712e54a28..8c4358596d 100644 --- a/xstream/planner/joinPlan.go +++ b/xstream/planner/joinPlan.go @@ -51,3 +51,8 @@ func extractCondition(condition xsql.Expr) (unpushable xsql.Expr, pushable xsql. //default case: all condition are unpushable return condition, nil } + +func (p *JoinPlan) PruneColumns(fields []xsql.Expr) error { + f := getFields(p.joins) + return p.baseLogicalPlan.PruneColumns(append(fields, f...)) +} diff --git a/xstream/planner/logicalPlan.go b/xstream/planner/logicalPlan.go index 29dea7a03a..56c1cf1904 100644 --- a/xstream/planner/logicalPlan.go +++ b/xstream/planner/logicalPlan.go @@ -9,6 +9,8 @@ type LogicalPlan interface { // It will accept a condition that is an expression slice, and return the expressions that can't be pushed. // It also return the new tree of plan as it can possibly change the tree PushDownPredicate(xsql.Expr) (xsql.Expr, LogicalPlan) + // Prune the unused columns in the data source level, by pushing all needed columns down + PruneColumns(fields []xsql.Expr) error } type baseLogicalPlan struct { @@ -39,3 +41,13 @@ func (p *baseLogicalPlan) PushDownPredicate(condition xsql.Expr) (xsql.Expr, Log } return rest, p.self } + +func (p *baseLogicalPlan) PruneColumns(fields []xsql.Expr) error { + for _, child := range p.children { + err := child.PruneColumns(fields) + if err != nil { + return err + } + } + return nil +} diff --git a/xstream/planner/optimizer.go b/xstream/planner/optimizer.go index a1127a1ef6..129c191e54 100644 --- a/xstream/planner/optimizer.go +++ b/xstream/planner/optimizer.go @@ -1,6 +1,7 @@ package planner var optRuleList = []logicalOptRule{ + &columnPruner{}, &predicatePushDown{}, } diff --git a/xstream/planner/orderPlan.go b/xstream/planner/orderPlan.go index 7e329f86fe..6b8287a7f9 100644 --- a/xstream/planner/orderPlan.go +++ b/xstream/planner/orderPlan.go @@ -11,3 +11,8 @@ func (p OrderPlan) Init() *OrderPlan { p.baseLogicalPlan.self = &p return &p } + +func (p *OrderPlan) PruneColumns(fields []xsql.Expr) error { + f := getFields(p.SortFields) + return p.baseLogicalPlan.PruneColumns(append(fields, f...)) +} diff --git a/xstream/planner/planner.go b/xstream/planner/planner.go index 8458036769..bdb5283a50 100644 --- a/xstream/planner/planner.go +++ b/xstream/planner/planner.go @@ -40,25 +40,25 @@ func PlanWithSourcesAndSinks(rule *api.Rule, storePath string, sources []*nodes. } defer store.Close() // Create logical plan and optimize. Logical plans are a linked list - lp, err := createLogicalPlan(stmt, rule.Options) + lp, err := createLogicalPlan(stmt, rule.Options, store) if err != nil { return nil, err } - tp, err := createTopo(rule, lp, sources, sinks, store, streamsFromStmt) + tp, err := createTopo(rule, lp, sources, sinks, streamsFromStmt) if err != nil { return nil, err } return tp, nil } -func createTopo(rule *api.Rule, lp LogicalPlan, sources []*nodes.SourceNode, sinks []*nodes.SinkNode, store common.KeyValue, streamsFromStmt []string) (*xstream.TopologyNew, error) { +func createTopo(rule *api.Rule, lp LogicalPlan, sources []*nodes.SourceNode, sinks []*nodes.SinkNode, streamsFromStmt []string) (*xstream.TopologyNew, error) { // Create topology tp, err := xstream.NewWithNameAndQos(rule.Id, rule.Options.Qos, rule.Options.CheckpointInterval) if err != nil { return nil, err } - input, _, err := buildOps(lp, tp, rule.Options, sources, store, streamsFromStmt, 0) + input, _, err := buildOps(lp, tp, rule.Options, sources, streamsFromStmt, 0) if err != nil { return nil, err } @@ -83,11 +83,11 @@ func createTopo(rule *api.Rule, lp LogicalPlan, sources []*nodes.SourceNode, sin return tp, nil } -func buildOps(lp LogicalPlan, tp *xstream.TopologyNew, options *api.RuleOption, sources []*nodes.SourceNode, store common.KeyValue, streamsFromStmt []string, index int) (api.Emitter, int, error) { +func buildOps(lp LogicalPlan, tp *xstream.TopologyNew, options *api.RuleOption, sources []*nodes.SourceNode, streamsFromStmt []string, index int) (api.Emitter, int, error) { var inputs []api.Emitter newIndex := index for _, c := range lp.Children() { - input, ni, err := buildOps(c, tp, options, sources, store, streamsFromStmt, newIndex) + input, ni, err := buildOps(c, tp, options, sources, streamsFromStmt, newIndex) if err != nil { return nil, 0, err } @@ -101,23 +101,13 @@ func buildOps(lp LogicalPlan, tp *xstream.TopologyNew, options *api.RuleOption, ) switch t := lp.(type) { case *DataSourcePlan: - streamStmt, err := getStream(store, t.name) - if err != nil { - return nil, 0, fmt.Errorf("fail to get stream %s, please check if stream is created", t.name) - } - isBinary := false - if f, ok := streamStmt.Options["FORMAT"]; ok { - if strings.ToLower(f) == common.FORMAT_BINARY { - isBinary = true - } - } - pp, err := operators.NewPreprocessor(streamStmt, t.alias, options.IsEventTime, isBinary) + pp, err := operators.NewPreprocessor(t.streamFields, t.alias, t.allMeta, t.metaFields, t.iet, t.timestampField, t.timestampFormat, t.isBinary) if err != nil { return nil, 0, err } var srcNode *nodes.SourceNode if len(sources) == 0 { - node := nodes.NewSourceNode(t.name, streamStmt.Options) + node := nodes.NewSourceNode(t.name, t.streamStmt.Options) srcNode = node } else { found := false @@ -187,7 +177,7 @@ func getStream(m common.KeyValue, name string) (stmt *xsql.StreamStmt, err error return } -func createLogicalPlan(stmt *xsql.SelectStatement, opt *api.RuleOption) (LogicalPlan, error) { +func createLogicalPlan(stmt *xsql.SelectStatement, opt *api.RuleOption, store common.KeyValue) (LogicalPlan, error) { streamsFromStmt := xsql.GetStreams(stmt) dimensions := stmt.Dimensions var ( @@ -207,13 +197,16 @@ func createLogicalPlan(stmt *xsql.SelectStatement, opt *api.RuleOption) (Logical } } for _, s := range streamsFromStmt { + streamStmt, err := getStream(store, s) + if err != nil { + return nil, fmt.Errorf("fail to get stream %s, please check if stream is created", s) + } p = DataSourcePlan{ name: s, - isWildCard: true, - needMeta: opt.SendMetaToSink, - fields: nil, - metaFields: nil, + streamStmt: streamStmt, + iet: opt.IsEventTime, alias: alias, + allMeta: opt.SendMetaToSink, }.Init() children = append(children, p) } diff --git a/xstream/planner/planner_test.go b/xstream/planner/planner_test.go index dee40b1662..13c8bf12b9 100644 --- a/xstream/planner/planner_test.go +++ b/xstream/planner/planner_test.go @@ -2,31 +2,81 @@ package planner import ( "fmt" + "github.com/emqx/kuiper/common" "github.com/emqx/kuiper/xsql" "github.com/emqx/kuiper/xstream/api" + "path" "reflect" "strings" "testing" ) +var ( + DbDir = getDbDir() +) + +func getDbDir() string { + common.InitConf() + dbDir, err := common.GetDataLoc() + if err != nil { + common.Log.Panic(err) + } + common.Log.Infof("db location is %s", dbDir) + return dbDir +} + func Test_createLogicalPlan(t *testing.T) { + store := common.GetSqliteKVStore(path.Join(DbDir, "stream")) + err := store.Open() + if err != nil { + t.Error(err) + return + } + defer store.Close() + streamSqls := map[string]string{ + "src1": `CREATE STREAM src1 ( + id1 BIGINT, + temp BIGINT, + name string + ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`, + "src2": `CREATE STREAM src1 ( + id2 BIGINT, + hum BIGINT + ) WITH (DATASOURCE="src1", FORMAT="json", KEY="ts");`, + } + for name, sql := range streamSqls { + store.Set(name, sql) + } + streams := make(map[string]*xsql.StreamStmt) + for n, _ := range streamSqls { + streamStmt, err := getStream(store, n) + if err != nil { + t.Errorf("fail to get stream %s, please check if stream is created", n) + return + } + streams[n] = streamStmt + } var tests = []struct { sql string p LogicalPlan err string }{ { // 0 - sql: `SELECT name FROM tbl`, + sql: `SELECT name FROM src1`, p: ProjectPlan{ baseLogicalPlan: baseLogicalPlan{ children: []LogicalPlan{ DataSourcePlan{ baseLogicalPlan: baseLogicalPlan{}, - name: "tbl", - isWildCard: true, - needMeta: false, - fields: nil, - metaFields: nil, + name: "src1", + streamFields: []interface{}{ + &xsql.StreamField{ + Name: "name", + FieldType: &xsql.BasicType{Type: xsql.STRINGS}, + }, + }, + streamStmt: streams["src1"], + metaFields: []string{}, }.Init(), }, }, @@ -40,7 +90,7 @@ func Test_createLogicalPlan(t *testing.T) { sendMeta: false, }.Init(), }, { // 1 optimize where to data source - sql: `SELECT abc FROM src1 WHERE f1 = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`, + sql: `SELECT temp FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10)`, p: ProjectPlan{ baseLogicalPlan: baseLogicalPlan{ children: []LogicalPlan{ @@ -51,16 +101,24 @@ func Test_createLogicalPlan(t *testing.T) { baseLogicalPlan: baseLogicalPlan{ children: []LogicalPlan{ DataSourcePlan{ - name: "src1", - isWildCard: true, - needMeta: false, - fields: nil, - metaFields: nil, + name: "src1", + streamFields: []interface{}{ + &xsql.StreamField{ + Name: "name", + FieldType: &xsql.BasicType{Type: xsql.STRINGS}, + }, + &xsql.StreamField{ + Name: "temp", + FieldType: &xsql.BasicType{Type: xsql.BIGINT}, + }, + }, + streamStmt: streams["src1"], + metaFields: []string{}, }.Init(), }, }, condition: &xsql.BinaryExpr{ - LHS: &xsql.FieldRef{Name: "f1"}, + LHS: &xsql.FieldRef{Name: "name"}, OP: xsql.EQ, RHS: &xsql.StringLiteral{Val: "v1"}, }, @@ -77,8 +135,8 @@ func Test_createLogicalPlan(t *testing.T) { }, fields: []xsql.Field{ { - Expr: &xsql.FieldRef{Name: "abc"}, - Name: "abc", + Expr: &xsql.FieldRef{Name: "temp"}, + Name: "temp", AName: ""}, }, isAggregate: false, @@ -89,65 +147,78 @@ func Test_createLogicalPlan(t *testing.T) { p: ProjectPlan{ baseLogicalPlan: baseLogicalPlan{ children: []LogicalPlan{ - FilterPlan{ + JoinPlan{ baseLogicalPlan: baseLogicalPlan{ children: []LogicalPlan{ - JoinPlan{ + WindowPlan{ baseLogicalPlan: baseLogicalPlan{ children: []LogicalPlan{ - WindowPlan{ - baseLogicalPlan: baseLogicalPlan{ - children: []LogicalPlan{ - DataSourcePlan{ - name: "src1", - isWildCard: true, - needMeta: false, - fields: nil, - metaFields: nil, - }.Init(), - DataSourcePlan{ - name: "src2", - isWildCard: true, - needMeta: false, - fields: nil, - metaFields: nil, - }.Init(), + DataSourcePlan{ + name: "src1", + streamFields: []interface{}{ + &xsql.StreamField{ + Name: "id1", + FieldType: &xsql.BasicType{Type: xsql.BIGINT}, + }, + &xsql.StreamField{ + Name: "temp", + FieldType: &xsql.BasicType{Type: xsql.BIGINT}, }, }, - condition: nil, - wtype: xsql.TUMBLING_WINDOW, - length: 10000, - interval: 0, - limit: 0, + streamStmt: streams["src1"], + metaFields: []string{}, + }.Init(), + DataSourcePlan{ + name: "src2", + streamFields: []interface{}{ + &xsql.StreamField{ + Name: "hum", + FieldType: &xsql.BasicType{Type: xsql.BIGINT}, + }, + &xsql.StreamField{ + Name: "id2", + FieldType: &xsql.BasicType{Type: xsql.BIGINT}, + }, + }, + streamStmt: streams["src2"], + metaFields: []string{}, }.Init(), }, }, - from: &xsql.Table{Name: "src1"}, - joins: xsql.Joins{xsql.Join{ - Name: "src2", - JoinType: xsql.INNER_JOIN, - Expr: &xsql.BinaryExpr{ - OP: xsql.EQ, - LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"}, - RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"}, - }, - }}, + condition: nil, + wtype: xsql.TUMBLING_WINDOW, + length: 10000, + interval: 0, + limit: 0, }.Init(), }, }, - condition: &xsql.BinaryExpr{ - LHS: &xsql.BinaryExpr{ - OP: xsql.GT, - LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"}, - RHS: &xsql.IntegerLiteral{Val: 20}, - }, - OP: xsql.OR, - RHS: &xsql.BinaryExpr{ - OP: xsql.GT, - LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"}, - RHS: &xsql.IntegerLiteral{Val: 60}, + from: &xsql.Table{Name: "src1"}, + joins: xsql.Joins{xsql.Join{ + Name: "src2", + JoinType: xsql.INNER_JOIN, + Expr: &xsql.BinaryExpr{ + OP: xsql.AND, + LHS: &xsql.BinaryExpr{ + LHS: &xsql.BinaryExpr{ + OP: xsql.GT, + LHS: &xsql.FieldRef{Name: "temp", StreamName: "src1"}, + RHS: &xsql.IntegerLiteral{Val: 20}, + }, + OP: xsql.OR, + RHS: &xsql.BinaryExpr{ + OP: xsql.GT, + LHS: &xsql.FieldRef{Name: "hum", StreamName: "src2"}, + RHS: &xsql.IntegerLiteral{Val: 60}, + }, + }, + RHS: &xsql.BinaryExpr{ + OP: xsql.EQ, + LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"}, + RHS: &xsql.FieldRef{Name: "id2", StreamName: "src2"}, + }, }, - }, + }}, }.Init(), }, }, @@ -161,7 +232,7 @@ func Test_createLogicalPlan(t *testing.T) { sendMeta: false, }.Init(), }, { // 3 optimize window filter - sql: `SELECT abc FROM src1 WHERE f1 = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE size > 2)`, + sql: `SELECT id1 FROM src1 WHERE name = "v1" GROUP BY TUMBLINGWINDOW(ss, 10) FILTER( WHERE temp > 2)`, p: ProjectPlan{ baseLogicalPlan: baseLogicalPlan{ children: []LogicalPlan{ @@ -172,23 +243,35 @@ func Test_createLogicalPlan(t *testing.T) { baseLogicalPlan: baseLogicalPlan{ children: []LogicalPlan{ DataSourcePlan{ - name: "src1", - isWildCard: true, - needMeta: false, - fields: nil, - metaFields: nil, + name: "src1", + streamFields: []interface{}{ + &xsql.StreamField{ + Name: "id1", + FieldType: &xsql.BasicType{Type: xsql.BIGINT}, + }, + &xsql.StreamField{ + Name: "name", + FieldType: &xsql.BasicType{Type: xsql.STRINGS}, + }, + &xsql.StreamField{ + Name: "temp", + FieldType: &xsql.BasicType{Type: xsql.BIGINT}, + }, + }, + streamStmt: streams["src1"], + metaFields: []string{}, }.Init(), }, }, condition: &xsql.BinaryExpr{ OP: xsql.AND, LHS: &xsql.BinaryExpr{ - LHS: &xsql.FieldRef{Name: "f1"}, + LHS: &xsql.FieldRef{Name: "name"}, OP: xsql.EQ, RHS: &xsql.StringLiteral{Val: "v1"}, }, RHS: &xsql.BinaryExpr{ - LHS: &xsql.FieldRef{Name: "size"}, + LHS: &xsql.FieldRef{Name: "temp"}, OP: xsql.GT, RHS: &xsql.IntegerLiteral{Val: 2}, }, @@ -206,15 +289,15 @@ func Test_createLogicalPlan(t *testing.T) { }, fields: []xsql.Field{ { - Expr: &xsql.FieldRef{Name: "abc"}, - Name: "abc", + Expr: &xsql.FieldRef{Name: "id1"}, + Name: "id1", AName: ""}, }, isAggregate: false, sendMeta: false, }.Init(), }, { // 4. do not optimize count window - sql: `SELECT * FROM demo WHERE temperature > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`, + sql: `SELECT * FROM src1 WHERE temp > 20 GROUP BY COUNTWINDOW(5,1) HAVING COUNT(*) > 2`, p: ProjectPlan{ baseLogicalPlan: baseLogicalPlan{ children: []LogicalPlan{ @@ -228,11 +311,24 @@ func Test_createLogicalPlan(t *testing.T) { baseLogicalPlan: baseLogicalPlan{ children: []LogicalPlan{ DataSourcePlan{ - name: "demo", + name: "src1", isWildCard: true, - needMeta: false, - fields: nil, - metaFields: nil, + streamFields: []interface{}{ + &xsql.StreamField{ + Name: "id1", + FieldType: &xsql.BasicType{Type: xsql.BIGINT}, + }, + &xsql.StreamField{ + Name: "temp", + FieldType: &xsql.BasicType{Type: xsql.BIGINT}, + }, + &xsql.StreamField{ + Name: "name", + FieldType: &xsql.BasicType{Type: xsql.STRINGS}, + }, + }, + streamStmt: streams["src1"], + metaFields: []string{}, }.Init(), }, }, @@ -245,7 +341,7 @@ func Test_createLogicalPlan(t *testing.T) { }, }, condition: &xsql.BinaryExpr{ - LHS: &xsql.FieldRef{Name: "temperature"}, + LHS: &xsql.FieldRef{Name: "temp"}, OP: xsql.GT, RHS: &xsql.IntegerLiteral{Val: 20}, }, @@ -272,7 +368,7 @@ func Test_createLogicalPlan(t *testing.T) { sendMeta: false, }.Init(), }, { // 5. optimize join on - sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`, + sql: `SELECT id1 FROM src1 INNER JOIN src2 on src1.id1 = src2.id2 and src1.temp > 20 and src2.hum < 60 WHERE src1.id1 > 111 GROUP BY TUMBLINGWINDOW(ss, 10)`, p: ProjectPlan{ baseLogicalPlan: baseLogicalPlan{ children: []LogicalPlan{ @@ -286,11 +382,19 @@ func Test_createLogicalPlan(t *testing.T) { baseLogicalPlan: baseLogicalPlan{ children: []LogicalPlan{ DataSourcePlan{ - name: "src1", - isWildCard: true, - needMeta: false, - fields: nil, - metaFields: nil, + name: "src1", + streamFields: []interface{}{ + &xsql.StreamField{ + Name: "id1", + FieldType: &xsql.BasicType{Type: xsql.BIGINT}, + }, + &xsql.StreamField{ + Name: "temp", + FieldType: &xsql.BasicType{Type: xsql.BIGINT}, + }, + }, + streamStmt: streams["src1"], + metaFields: []string{}, }.Init(), }, }, @@ -303,7 +407,7 @@ func Test_createLogicalPlan(t *testing.T) { OP: xsql.AND, LHS: &xsql.BinaryExpr{ OP: xsql.GT, - LHS: &xsql.FieldRef{Name: "id", StreamName: "src1"}, + LHS: &xsql.FieldRef{Name: "id1", StreamName: "src1"}, RHS: &xsql.IntegerLiteral{Val: 111}, }, }, @@ -312,11 +416,19 @@ func Test_createLogicalPlan(t *testing.T) { baseLogicalPlan: baseLogicalPlan{ children: []LogicalPlan{ DataSourcePlan{ - name: "src2", - isWildCard: true, - needMeta: false, - fields: nil, - metaFields: nil, + name: "src2", + streamFields: []interface{}{ + &xsql.StreamField{ + Name: "hum", + FieldType: &xsql.BasicType{Type: xsql.BIGINT}, + }, + &xsql.StreamField{ + Name: "id2", + FieldType: &xsql.BasicType{Type: xsql.BIGINT}, + }, + }, + streamStmt: streams["src2"], + metaFields: []string{}, }.Init(), }, }, @@ -378,11 +490,19 @@ func Test_createLogicalPlan(t *testing.T) { baseLogicalPlan: baseLogicalPlan{ children: []LogicalPlan{ DataSourcePlan{ - name: "src1", - isWildCard: true, - needMeta: false, - fields: nil, - metaFields: nil, + name: "src1", + streamFields: []interface{}{ + &xsql.StreamField{ + Name: "id1", + FieldType: &xsql.BasicType{Type: xsql.BIGINT}, + }, + &xsql.StreamField{ + Name: "temp", + FieldType: &xsql.BasicType{Type: xsql.BIGINT}, + }, + }, + streamStmt: streams["src1"], + metaFields: []string{}, }.Init(), }, }, @@ -393,11 +513,19 @@ func Test_createLogicalPlan(t *testing.T) { }, }.Init(), DataSourcePlan{ - name: "src2", - isWildCard: true, - needMeta: false, - fields: nil, - metaFields: nil, + name: "src2", + streamFields: []interface{}{ + &xsql.StreamField{ + Name: "hum", + FieldType: &xsql.BasicType{Type: xsql.BIGINT}, + }, + &xsql.StreamField{ + Name: "id2", + FieldType: &xsql.BasicType{Type: xsql.BIGINT}, + }, + }, + streamStmt: streams["src2"], + metaFields: []string{}, }.Init(), }, }, @@ -454,9 +582,9 @@ func Test_createLogicalPlan(t *testing.T) { }.Init(), }, } - //TODO optimize having, optimize on fmt.Printf("The test bucket size is %d.\n\n", len(tests)) - for i, tt := range tests[6:] { + + for i, tt := range tests { stmt, err := xsql.NewParser(strings.NewReader(tt.sql)).Parse() if err != nil { t.Errorf("%d. %q: error compile sql: %s\n", i, tt.sql, err) @@ -470,7 +598,7 @@ func Test_createLogicalPlan(t *testing.T) { SendMetaToSink: false, Qos: 0, CheckpointInterval: 0, - }) + }, store) if err != nil { t.Errorf("%d. %q\n\nerror:%v\n\n", i, tt.sql, err) } diff --git a/xstream/planner/projectPlan.go b/xstream/planner/projectPlan.go index 5c6d0f94bc..3cacbcd3fe 100644 --- a/xstream/planner/projectPlan.go +++ b/xstream/planner/projectPlan.go @@ -13,3 +13,8 @@ func (p ProjectPlan) Init() *ProjectPlan { p.baseLogicalPlan.self = &p return &p } + +func (p *ProjectPlan) PruneColumns(fields []xsql.Expr) error { + f := getFields(p.fields) + return p.baseLogicalPlan.PruneColumns(append(fields, f...)) +} diff --git a/xstream/planner/rules.go b/xstream/planner/rules.go index a7a23221bb..909ba78e23 100644 --- a/xstream/planner/rules.go +++ b/xstream/planner/rules.go @@ -15,3 +15,14 @@ func (r *predicatePushDown) optimize(lp LogicalPlan) (LogicalPlan, error) { func (r *predicatePushDown) name() string { return "predicatePushDown" } + +type columnPruner struct{} + +func (r *columnPruner) optimize(lp LogicalPlan) (LogicalPlan, error) { + err := lp.PruneColumns(nil) + return lp, err +} + +func (r *columnPruner) name() string { + return "columnPruner" +} diff --git a/xstream/planner/util.go b/xstream/planner/util.go index d59347a785..e9bff80edf 100644 --- a/xstream/planner/util.go +++ b/xstream/planner/util.go @@ -32,3 +32,20 @@ func combine(l xsql.Expr, r xsql.Expr) xsql.Expr { return r } } + +func getFields(node xsql.Node) []xsql.Expr { + result := make([]xsql.Expr, 0) + xsql.WalkFunc(node, func(n xsql.Node) { + switch t := n.(type) { + case *xsql.FieldRef: + result = append(result, t) + case *xsql.Wildcard: + result = append(result, t) + case *xsql.MetaRef: + result = append(result, t) + case *xsql.SortField: + result = append(result, t) + } + }) + return result +} diff --git a/xstream/planner/windowPlan.go b/xstream/planner/windowPlan.go index e19cf7b96f..7ebd5fe925 100644 --- a/xstream/planner/windowPlan.go +++ b/xstream/planner/windowPlan.go @@ -35,3 +35,8 @@ func (p *WindowPlan) PushDownPredicate(condition xsql.Expr) (xsql.Expr, LogicalP return nil, p } } + +func (p *WindowPlan) PruneColumns(fields []xsql.Expr) error { + f := getFields(p.condition) + return p.baseLogicalPlan.PruneColumns(append(fields, f...)) +}