Skip to content

Commit

Permalink
feat(sql): join on optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
ngjaying committed Jan 6, 2021
1 parent 7f71ccf commit 106193a
Show file tree
Hide file tree
Showing 7 changed files with 328 additions and 101 deletions.
5 changes: 5 additions & 0 deletions xsql/processors/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func compareMetrics(tp *xstream.TopologyNew, m map[string]interface{}) (err erro
if matched {
continue
}
if common.Config.Basic.Debug == true {
for i, k := range keys {
log.Printf("%s:%v", k, values[i])
}
}
//do not find
if index < len(values) {
return fmt.Errorf("metrics mismatch for %s:\n\nexp=%#v(%t)\n\ngot=%#v(%t)\n\n", k, v, v, values[index], values[index])
Expand Down
110 changes: 55 additions & 55 deletions xsql/processors/window_rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ func TestWindow(t *testing.T) {
"op_1_preprocessor_demo_0_records_in_total": int64(5),
"op_1_preprocessor_demo_0_records_out_total": int64(5),

"op_4_project_0_exceptions_total": int64(0),
"op_4_project_0_process_latency_us": int64(0),
"op_4_project_0_records_in_total": int64(2),
"op_4_project_0_records_out_total": int64(2),
"op_3_project_0_exceptions_total": int64(0),
"op_3_project_0_process_latency_us": int64(0),
"op_3_project_0_records_in_total": int64(2),
"op_3_project_0_records_out_total": int64(2),

"sink_mockSink_0_exceptions_total": int64(0),
"sink_mockSink_0_records_in_total": int64(2),
Expand All @@ -115,15 +115,15 @@ func TestWindow(t *testing.T) {
"source_demo_0_records_in_total": int64(5),
"source_demo_0_records_out_total": int64(5),

"op_3_window_0_exceptions_total": int64(0),
"op_3_window_0_process_latency_us": int64(0),
"op_3_window_0_records_in_total": int64(3),
"op_3_window_0_records_out_total": int64(2),
"op_2_window_0_exceptions_total": int64(0),
"op_2_window_0_process_latency_us": int64(0),
"op_2_window_0_records_in_total": int64(3),
"op_2_window_0_records_out_total": int64(2),

"op_2_filter_0_exceptions_total": int64(0),
"op_2_filter_0_process_latency_us": int64(0),
"op_2_filter_0_records_in_total": int64(5),
"op_2_filter_0_records_out_total": int64(3),
"op_2_windowFilter_0_exceptions_total": int64(0),
"op_2_windowFilter_0_process_latency_us": int64(0),
"op_2_windowFilter_0_records_in_total": int64(5),
"op_2_windowFilter_0_records_out_total": int64(3),
},
}, {
name: `TestWindowRule3`,
Expand Down Expand Up @@ -474,10 +474,10 @@ func TestWindow(t *testing.T) {
"op_1_preprocessor_demo_0_records_in_total": int64(5),
"op_1_preprocessor_demo_0_records_out_total": int64(5),

"op_6_project_0_exceptions_total": int64(0),
"op_6_project_0_process_latency_us": int64(0),
"op_6_project_0_records_in_total": int64(1),
"op_6_project_0_records_out_total": int64(1),
"op_5_project_0_exceptions_total": int64(0),
"op_5_project_0_process_latency_us": int64(0),
"op_5_project_0_records_in_total": int64(1),
"op_5_project_0_records_out_total": int64(1),

"sink_mockSink_0_exceptions_total": int64(0),
"sink_mockSink_0_records_in_total": int64(1),
Expand All @@ -487,25 +487,25 @@ func TestWindow(t *testing.T) {
"source_demo_0_records_in_total": int64(5),
"source_demo_0_records_out_total": int64(5),

"op_3_window_0_exceptions_total": int64(0),
"op_3_window_0_process_latency_us": int64(0),
"op_3_window_0_records_in_total": int64(3),
"op_3_window_0_records_out_total": int64(2),

"op_2_filter_0_exceptions_total": int64(0),
"op_2_filter_0_process_latency_us": int64(0),
"op_2_filter_0_records_in_total": int64(5),
"op_2_filter_0_records_out_total": int64(3),

"op_4_aggregate_0_exceptions_total": int64(0),
"op_4_aggregate_0_process_latency_us": int64(0),
"op_4_aggregate_0_records_in_total": int64(2),
"op_4_aggregate_0_records_out_total": int64(2),

"op_5_having_0_exceptions_total": int64(0),
"op_5_having_0_process_latency_us": int64(0),
"op_5_having_0_records_in_total": int64(2),
"op_5_having_0_records_out_total": int64(1),
"op_2_window_0_exceptions_total": int64(0),
"op_2_window_0_process_latency_us": int64(0),
"op_2_window_0_records_in_total": int64(3),
"op_2_window_0_records_out_total": int64(2),

"op_2_windowFilter_0_exceptions_total": int64(0),
"op_2_windowFilter_0_process_latency_us": int64(0),
"op_2_windowFilter_0_records_in_total": int64(5),
"op_2_windowFilter_0_records_out_total": int64(3),

"op_3_aggregate_0_exceptions_total": int64(0),
"op_3_aggregate_0_process_latency_us": int64(0),
"op_3_aggregate_0_records_in_total": int64(2),
"op_3_aggregate_0_records_out_total": int64(2),

"op_4_having_0_exceptions_total": int64(0),
"op_4_having_0_process_latency_us": int64(0),
"op_4_having_0_records_in_total": int64(2),
"op_4_having_0_records_out_total": int64(1),
},
}, {
name: `TestWindowRule9`,
Expand Down Expand Up @@ -546,10 +546,10 @@ func TestWindow(t *testing.T) {
"op_1_preprocessor_demo_0_records_in_total": int64(5),
"op_1_preprocessor_demo_0_records_out_total": int64(5),

"op_4_project_0_exceptions_total": int64(0),
"op_4_project_0_process_latency_us": int64(0),
"op_4_project_0_records_in_total": int64(4),
"op_4_project_0_records_out_total": int64(4),
"op_3_project_0_exceptions_total": int64(0),
"op_3_project_0_process_latency_us": int64(0),
"op_3_project_0_records_in_total": int64(4),
"op_3_project_0_records_out_total": int64(4),

"sink_mockSink_0_exceptions_total": int64(0),
"sink_mockSink_0_records_in_total": int64(4),
Expand All @@ -559,10 +559,10 @@ func TestWindow(t *testing.T) {
"source_demo_0_records_in_total": int64(5),
"source_demo_0_records_out_total": int64(5),

"op_3_window_0_exceptions_total": int64(0),
"op_3_window_0_process_latency_us": int64(0),
"op_3_window_0_records_in_total": int64(3),
"op_3_window_0_records_out_total": int64(4),
"op_2_window_0_exceptions_total": int64(0),
"op_2_window_0_process_latency_us": int64(0),
"op_2_window_0_records_in_total": int64(3),
"op_2_window_0_records_out_total": int64(4),
},
}, {
name: `TestCountWindowRule1`,
Expand Down Expand Up @@ -1144,10 +1144,10 @@ func TestWindowError(t *testing.T) {
"op_1_preprocessor_ldemo_0_records_in_total": int64(5),
"op_1_preprocessor_ldemo_0_records_out_total": int64(5),

"op_4_project_0_exceptions_total": int64(1),
"op_4_project_0_process_latency_us": int64(0),
"op_4_project_0_records_in_total": int64(3),
"op_4_project_0_records_out_total": int64(2),
"op_3_project_0_exceptions_total": int64(1),
"op_3_project_0_process_latency_us": int64(0),
"op_3_project_0_records_in_total": int64(3),
"op_3_project_0_records_out_total": int64(2),

"sink_mockSink_0_exceptions_total": int64(0),
"sink_mockSink_0_records_in_total": int64(3),
Expand All @@ -1157,15 +1157,15 @@ func TestWindowError(t *testing.T) {
"source_ldemo_0_records_in_total": int64(5),
"source_ldemo_0_records_out_total": int64(5),

"op_3_window_0_exceptions_total": int64(1),
"op_3_window_0_process_latency_us": int64(0),
"op_3_window_0_records_in_total": int64(3),
"op_3_window_0_records_out_total": int64(2),
"op_2_window_0_exceptions_total": int64(1),
"op_2_window_0_process_latency_us": int64(0),
"op_2_window_0_records_in_total": int64(3),
"op_2_window_0_records_out_total": int64(2),

"op_2_filter_0_exceptions_total": int64(1),
"op_2_filter_0_process_latency_us": int64(0),
"op_2_filter_0_records_in_total": int64(5),
"op_2_filter_0_records_out_total": int64(2),
"op_2_windowFilter_0_exceptions_total": int64(1),
"op_2_windowFilter_0_process_latency_us": int64(0),
"op_2_windowFilter_0_records_in_total": int64(5),
"op_2_windowFilter_0_records_out_total": int64(2),
},
}, {
name: `TestWindowErrorRule3`,
Expand Down
32 changes: 28 additions & 4 deletions xstream/planner/dataSourcePlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,37 @@ func (p DataSourcePlan) Init() *DataSourcePlan {

// Presume no children for data source
func (p *DataSourcePlan) PushDownPredicate(condition xsql.Expr) (xsql.Expr, LogicalPlan) {
if condition != nil {
owned, other := p.extract(condition)
if owned != nil {
// Add a filter plan for children
f := FilterPlan{
condition: condition,
condition: owned,
}.Init()
f.SetChildren([]LogicalPlan{p})
return nil, f
return other, f
}
return other, p
}

func (p *DataSourcePlan) extract(expr xsql.Expr) (xsql.Expr, xsql.Expr) {
s := getRefSources(expr)
switch len(s) {
case 0:
return expr, nil
case 1:
if s[0] == p.name {
return expr, nil
} else {
return nil, expr
}
default:
if be, ok := expr.(*xsql.BinaryExpr); ok && be.OP == xsql.AND {
ul, pl := p.extract(be.LHS)
ur, pr := p.extract(be.RHS)
owned := combine(ul, ur)
other := combine(pl, pr)
return owned, other
}
return nil, expr
}
return nil, p
}
42 changes: 3 additions & 39 deletions xstream/planner/filterPlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@ func (p *FilterPlan) PushDownPredicate(condition xsql.Expr) (xsql.Expr, LogicalP
p.condition = a
return nil, p
}
// if has child, try to move pushable condition out
up, pp := extractCondition(a)

rest, _ := p.baseLogicalPlan.PushDownPredicate(pp)
rest, _ := p.baseLogicalPlan.PushDownPredicate(a)

up = combine(up, rest)
if up != nil {
p.condition = up
if rest != nil {
p.condition = rest
return nil, p
} else if len(p.children) == 1 {
// eliminate this filter
Expand All @@ -35,36 +32,3 @@ func (p *FilterPlan) PushDownPredicate(condition xsql.Expr) (xsql.Expr, LogicalP
return nil, p
}
}

// Return the unpushable condition and pushable condition
func extractCondition(condition xsql.Expr) (unpushable xsql.Expr, pushable xsql.Expr) {
s := GetRefSources(condition)
if len(s) < 2 {
pushable = condition
return
} else {
if be, ok := condition.(*xsql.BinaryExpr); ok && be.OP == xsql.AND {
ul, pl := extractCondition(be.LHS)
ur, pr := extractCondition(be.RHS)
unpushable = combine(ul, ur)
pushable = combine(pl, pr)
return
}
}
//default case: all condition are unpushable
return condition, nil
}

func combine(l xsql.Expr, r xsql.Expr) xsql.Expr {
if l != nil && r != nil {
return &xsql.BinaryExpr{
OP: xsql.AND,
LHS: l,
RHS: r,
}
} else if l != nil {
return l
} else {
return r
}
}
39 changes: 39 additions & 0 deletions xstream/planner/joinPlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,42 @@ func (p JoinPlan) Init() *JoinPlan {
p.baseLogicalPlan.self = &p
return &p
}

func (p *JoinPlan) PushDownPredicate(condition xsql.Expr) (xsql.Expr, LogicalPlan) {
//TODO multiple join support
//Assume only one join
j := p.joins[0]
switch j.JoinType {
case xsql.INNER_JOIN:
a := combine(condition, j.Expr)
multipleSourcesCondition, singleSourceCondition := extractCondition(a)
rest, _ := p.baseLogicalPlan.PushDownPredicate(singleSourceCondition)
j.Expr = combine(multipleSourcesCondition, rest) //always swallow all conditions
p.joins[0] = j
return nil, p
default: //TODO fine grain handling for left/right join
multipleSourcesCondition, singleSourceCondition := extractCondition(condition)
rest, _ := p.baseLogicalPlan.PushDownPredicate(singleSourceCondition)
// never swallow anything
return combine(multipleSourcesCondition, rest), p
}
}

// Return the unpushable condition and pushable condition
func extractCondition(condition xsql.Expr) (unpushable xsql.Expr, pushable xsql.Expr) {
s := getRefSources(condition)
if len(s) < 2 {
pushable = condition
return
} else {
if be, ok := condition.(*xsql.BinaryExpr); ok && be.OP == xsql.AND {
ul, pl := extractCondition(be.LHS)
ur, pr := extractCondition(be.RHS)
unpushable = combine(ul, ur)
pushable = combine(pl, pr)
return
}
}
//default case: all condition are unpushable
return condition, nil
}
Loading

0 comments on commit 106193a

Please sign in to comment.