Skip to content

Commit

Permalink
feat(sql): column pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
ngjaying committed Jan 6, 2021
1 parent 106193a commit c14dbd2
Show file tree
Hide file tree
Showing 21 changed files with 598 additions and 283 deletions.
1 change: 1 addition & 0 deletions xsql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ func Walk(v Visitor, node Node) {
Walk(v, n.Joins)
Walk(v, n.Condition)
Walk(v, n.SortFields)
Walk(v, n.Having)

case SortFields:
for _, sf := range n {
Expand Down
2 changes: 1 addition & 1 deletion xsql/ast_agg_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestIsAggStatement(t *testing.T) {
{s: `SELECT max(f1) FROM tbl`, agg: true},
{s: `SELECT min(f1) FROM tbl`, agg: true},
{s: `SELECT count(f1) FROM tbl group by tumblingwindow(ss, 5)`, agg: true},

{s: `SELECT f1 FROM tbl group by tumblingwindow(ss, 5) having count(f1) > 3`, agg: false},
{s: `SELECT f1 FROM tbl left join tbl2 on tbl1.f1 = tbl2.f2`, agg: false},
}

Expand Down
21 changes: 11 additions & 10 deletions xsql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,21 @@ func (fv *FunctionValuer) Call(name string, args []interface{}) (interface{}, bo
}
}

func IsAggStatement(node Node) bool {
var r = false
WalkFunc(node, func(n Node) {
if f, ok := n.(*Call); ok {
func IsAggStatement(stmt *SelectStatement) bool {
if stmt.Dimensions != nil {
ds := stmt.Dimensions.GetGroups()
if ds != nil && len(ds) > 0 {
return true
}
}
r := false
WalkFunc(stmt.Fields, func(n Node) {
switch f := n.(type) {
case *Call:
if ok := isAggFunc(f); ok {
r = true
return
}
} else if d, ok := n.(Dimensions); ok {
ds := d.GetGroups()
if ds != nil && len(ds) > 0 {
r = true
return
}
}
})
return r
Expand Down
59 changes: 9 additions & 50 deletions xsql/processors/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,71 +149,30 @@ func TestSingleSQL(t *testing.T) {
sql: `SELECT size as Int8, ts FROM demoError where size > 3`,
r: [][]map[string]interface{}{
{{
"error": "error in preprocessor: invalid data type for color, expect string but found int(3)",
"error": "error in preprocessor: invalid data type for size, expect bigint but found string(red)",
}},
{{
"Int8": float64(6),
"ts": float64(1541152486822),
}},
{{
"error": "error in preprocessor: invalid data type for color, expect string but found int(7)",
}},
{{
"error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
}},
},
m: map[string]interface{}{
"op_1_preprocessor_demoError_0_exceptions_total": int64(3),
"op_1_preprocessor_demoError_0_process_latency_us": int64(0),
"op_1_preprocessor_demoError_0_records_in_total": int64(5),
"op_1_preprocessor_demoError_0_records_out_total": int64(2),

"op_3_project_0_exceptions_total": int64(3),
"op_3_project_0_process_latency_us": int64(0),
"op_3_project_0_records_in_total": int64(4),
"op_3_project_0_records_out_total": int64(1),

"sink_mockSink_0_exceptions_total": int64(0),
"sink_mockSink_0_records_in_total": int64(4),
"sink_mockSink_0_records_out_total": int64(4),

"source_demoError_0_exceptions_total": int64(0),
"source_demoError_0_records_in_total": int64(5),
"source_demoError_0_records_out_total": int64(5),

"op_2_filter_0_exceptions_total": int64(3),
"op_2_filter_0_process_latency_us": int64(0),
"op_2_filter_0_records_in_total": int64(5),
"op_2_filter_0_records_out_total": int64(1),
},
}, {
name: `TestSingleSQLRule4`,
sql: `SELECT size as Int8, ts FROM demoError where size > 3`,
r: [][]map[string]interface{}{
{{
"error": "error in preprocessor: invalid data type for color, expect string but found int(3)",
}},
{{
"Int8": float64(6),
"ts": float64(1541152486822),
}},
{{
"error": "error in preprocessor: invalid data type for color, expect string but found int(7)",
"Int8": float64(4),
"ts": float64(1541152488442),
}},
{{
"error": "error in preprocessor: invalid data type for size, expect bigint but found string(blue)",
}},
},
m: map[string]interface{}{
"op_1_preprocessor_demoError_0_exceptions_total": int64(3),
"op_1_preprocessor_demoError_0_exceptions_total": int64(2),
"op_1_preprocessor_demoError_0_process_latency_us": int64(0),
"op_1_preprocessor_demoError_0_records_in_total": int64(5),
"op_1_preprocessor_demoError_0_records_out_total": int64(2),
"op_1_preprocessor_demoError_0_records_out_total": int64(3),

"op_3_project_0_exceptions_total": int64(3),
"op_3_project_0_exceptions_total": int64(2),
"op_3_project_0_process_latency_us": int64(0),
"op_3_project_0_records_in_total": int64(4),
"op_3_project_0_records_out_total": int64(1),
"op_3_project_0_records_out_total": int64(2),

"sink_mockSink_0_exceptions_total": int64(0),
"sink_mockSink_0_records_in_total": int64(4),
Expand All @@ -223,10 +182,10 @@ func TestSingleSQL(t *testing.T) {
"source_demoError_0_records_in_total": int64(5),
"source_demoError_0_records_out_total": int64(5),

"op_2_filter_0_exceptions_total": int64(3),
"op_2_filter_0_exceptions_total": int64(2),
"op_2_filter_0_process_latency_us": int64(0),
"op_2_filter_0_records_in_total": int64(5),
"op_2_filter_0_records_out_total": int64(1),
"op_2_filter_0_records_out_total": int64(2),
},
}, {
name: `TestSingleSQLRule5`,
Expand Down
110 changes: 55 additions & 55 deletions xsql/processors/window_rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ func TestWindow(t *testing.T) {
"op_1_preprocessor_demo_0_records_in_total": int64(5),
"op_1_preprocessor_demo_0_records_out_total": int64(5),

"op_3_project_0_exceptions_total": int64(0),
"op_3_project_0_process_latency_us": int64(0),
"op_3_project_0_records_in_total": int64(2),
"op_3_project_0_records_out_total": int64(2),
"op_4_project_0_exceptions_total": int64(0),
"op_4_project_0_process_latency_us": int64(0),
"op_4_project_0_records_in_total": int64(2),
"op_4_project_0_records_out_total": int64(2),

"sink_mockSink_0_exceptions_total": int64(0),
"sink_mockSink_0_records_in_total": int64(2),
Expand All @@ -115,15 +115,15 @@ func TestWindow(t *testing.T) {
"source_demo_0_records_in_total": int64(5),
"source_demo_0_records_out_total": int64(5),

"op_2_window_0_exceptions_total": int64(0),
"op_2_window_0_process_latency_us": int64(0),
"op_2_window_0_records_in_total": int64(3),
"op_2_window_0_records_out_total": int64(2),
"op_3_window_0_exceptions_total": int64(0),
"op_3_window_0_process_latency_us": int64(0),
"op_3_window_0_records_in_total": int64(3),
"op_3_window_0_records_out_total": int64(2),

"op_2_windowFilter_0_exceptions_total": int64(0),
"op_2_windowFilter_0_process_latency_us": int64(0),
"op_2_windowFilter_0_records_in_total": int64(5),
"op_2_windowFilter_0_records_out_total": int64(3),
"op_2_filter_0_exceptions_total": int64(0),
"op_2_filter_0_process_latency_us": int64(0),
"op_2_filter_0_records_in_total": int64(5),
"op_2_filter_0_records_out_total": int64(3),
},
}, {
name: `TestWindowRule3`,
Expand Down Expand Up @@ -474,10 +474,10 @@ func TestWindow(t *testing.T) {
"op_1_preprocessor_demo_0_records_in_total": int64(5),
"op_1_preprocessor_demo_0_records_out_total": int64(5),

"op_5_project_0_exceptions_total": int64(0),
"op_5_project_0_process_latency_us": int64(0),
"op_5_project_0_records_in_total": int64(1),
"op_5_project_0_records_out_total": int64(1),
"op_6_project_0_exceptions_total": int64(0),
"op_6_project_0_process_latency_us": int64(0),
"op_6_project_0_records_in_total": int64(1),
"op_6_project_0_records_out_total": int64(1),

"sink_mockSink_0_exceptions_total": int64(0),
"sink_mockSink_0_records_in_total": int64(1),
Expand All @@ -487,25 +487,25 @@ func TestWindow(t *testing.T) {
"source_demo_0_records_in_total": int64(5),
"source_demo_0_records_out_total": int64(5),

"op_2_window_0_exceptions_total": int64(0),
"op_2_window_0_process_latency_us": int64(0),
"op_2_window_0_records_in_total": int64(3),
"op_2_window_0_records_out_total": int64(2),

"op_2_windowFilter_0_exceptions_total": int64(0),
"op_2_windowFilter_0_process_latency_us": int64(0),
"op_2_windowFilter_0_records_in_total": int64(5),
"op_2_windowFilter_0_records_out_total": int64(3),

"op_3_aggregate_0_exceptions_total": int64(0),
"op_3_aggregate_0_process_latency_us": int64(0),
"op_3_aggregate_0_records_in_total": int64(2),
"op_3_aggregate_0_records_out_total": int64(2),

"op_4_having_0_exceptions_total": int64(0),
"op_4_having_0_process_latency_us": int64(0),
"op_4_having_0_records_in_total": int64(2),
"op_4_having_0_records_out_total": int64(1),
"op_3_window_0_exceptions_total": int64(0),
"op_3_window_0_process_latency_us": int64(0),
"op_3_window_0_records_in_total": int64(3),
"op_3_window_0_records_out_total": int64(2),

"op_2_filter_0_exceptions_total": int64(0),
"op_2_filter_0_process_latency_us": int64(0),
"op_2_filter_0_records_in_total": int64(5),
"op_2_filter_0_records_out_total": int64(3),

"op_4_aggregate_0_exceptions_total": int64(0),
"op_4_aggregate_0_process_latency_us": int64(0),
"op_4_aggregate_0_records_in_total": int64(2),
"op_4_aggregate_0_records_out_total": int64(2),

"op_5_having_0_exceptions_total": int64(0),
"op_5_having_0_process_latency_us": int64(0),
"op_5_having_0_records_in_total": int64(2),
"op_5_having_0_records_out_total": int64(1),
},
}, {
name: `TestWindowRule9`,
Expand Down Expand Up @@ -546,10 +546,10 @@ func TestWindow(t *testing.T) {
"op_1_preprocessor_demo_0_records_in_total": int64(5),
"op_1_preprocessor_demo_0_records_out_total": int64(5),

"op_3_project_0_exceptions_total": int64(0),
"op_3_project_0_process_latency_us": int64(0),
"op_3_project_0_records_in_total": int64(4),
"op_3_project_0_records_out_total": int64(4),
"op_4_project_0_exceptions_total": int64(0),
"op_4_project_0_process_latency_us": int64(0),
"op_4_project_0_records_in_total": int64(4),
"op_4_project_0_records_out_total": int64(4),

"sink_mockSink_0_exceptions_total": int64(0),
"sink_mockSink_0_records_in_total": int64(4),
Expand All @@ -559,10 +559,10 @@ func TestWindow(t *testing.T) {
"source_demo_0_records_in_total": int64(5),
"source_demo_0_records_out_total": int64(5),

"op_2_window_0_exceptions_total": int64(0),
"op_2_window_0_process_latency_us": int64(0),
"op_2_window_0_records_in_total": int64(3),
"op_2_window_0_records_out_total": int64(4),
"op_3_window_0_exceptions_total": int64(0),
"op_3_window_0_process_latency_us": int64(0),
"op_3_window_0_records_in_total": int64(3),
"op_3_window_0_records_out_total": int64(4),
},
}, {
name: `TestCountWindowRule1`,
Expand Down Expand Up @@ -1144,10 +1144,10 @@ func TestWindowError(t *testing.T) {
"op_1_preprocessor_ldemo_0_records_in_total": int64(5),
"op_1_preprocessor_ldemo_0_records_out_total": int64(5),

"op_3_project_0_exceptions_total": int64(1),
"op_3_project_0_process_latency_us": int64(0),
"op_3_project_0_records_in_total": int64(3),
"op_3_project_0_records_out_total": int64(2),
"op_4_project_0_exceptions_total": int64(1),
"op_4_project_0_process_latency_us": int64(0),
"op_4_project_0_records_in_total": int64(3),
"op_4_project_0_records_out_total": int64(2),

"sink_mockSink_0_exceptions_total": int64(0),
"sink_mockSink_0_records_in_total": int64(3),
Expand All @@ -1157,15 +1157,15 @@ func TestWindowError(t *testing.T) {
"source_ldemo_0_records_in_total": int64(5),
"source_ldemo_0_records_out_total": int64(5),

"op_2_window_0_exceptions_total": int64(1),
"op_2_window_0_process_latency_us": int64(0),
"op_2_window_0_records_in_total": int64(3),
"op_2_window_0_records_out_total": int64(2),
"op_3_window_0_exceptions_total": int64(1),
"op_3_window_0_process_latency_us": int64(0),
"op_3_window_0_records_in_total": int64(3),
"op_3_window_0_records_out_total": int64(2),

"op_2_windowFilter_0_exceptions_total": int64(1),
"op_2_windowFilter_0_process_latency_us": int64(0),
"op_2_windowFilter_0_records_in_total": int64(5),
"op_2_windowFilter_0_records_out_total": int64(2),
"op_2_filter_0_exceptions_total": int64(1),
"op_2_filter_0_process_latency_us": int64(0),
"op_2_filter_0_records_in_total": int64(5),
"op_2_filter_0_records_out_total": int64(2),
},
}, {
name: `TestWindowErrorRule3`,
Expand Down
Loading

0 comments on commit c14dbd2

Please sign in to comment.