Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix the error that is raised when executing the window function with range type frame (#46927) #47098

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4727,6 +4727,8 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) Executor {
exec.orderByCols = orderByCols
exec.expectedCmpResult = cmpResult
exec.isRangeFrame = true
exec.start.InitCompareCols(b.ctx, exec.orderByCols)
exec.end.InitCompareCols(b.ctx, exec.orderByCols)
}
}
return exec
Expand All @@ -4749,14 +4751,19 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) Executor {
if len(v.OrderBy) > 0 && v.OrderBy[0].Desc {
cmpResult = 1
}
processor = &rangeFrameWindowProcessor{
tmpProcessor := &rangeFrameWindowProcessor{
windowFuncs: windowFuncs,
partialResults: partialResults,
start: v.Frame.Start,
end: v.Frame.End,
orderByCols: orderByCols,
expectedCmpResult: cmpResult,
}

tmpProcessor.start.InitCompareCols(b.ctx, orderByCols)
tmpProcessor.end.InitCompareCols(b.ctx, orderByCols)

processor = tmpProcessor
}
return &WindowExec{baseExecutor: base,
processor: processor,
Expand Down
4 changes: 2 additions & 2 deletions executor/pipelined_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (e *PipelinedWindowExec) getStart(ctx sessionctx.Context) (uint64, error) {
var res int64
var err error
for i := range e.orderByCols {
res, _, err = e.start.CmpFuncs[i](ctx, e.orderByCols[i], e.start.CalcFuncs[i], e.getRow(start), e.getRow(e.curRowIdx))
res, _, err = e.start.CmpFuncs[i](ctx, e.start.CompareCols[i], e.start.CalcFuncs[i], e.getRow(start), e.getRow(e.curRowIdx))
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -302,7 +302,7 @@ func (e *PipelinedWindowExec) getEnd(ctx sessionctx.Context) (uint64, error) {
var res int64
var err error
for i := range e.orderByCols {
res, _, err = e.end.CmpFuncs[i](ctx, e.end.CalcFuncs[i], e.orderByCols[i], e.getRow(e.curRowIdx), e.getRow(end))
res, _, err = e.end.CmpFuncs[i](ctx, e.end.CalcFuncs[i], e.end.CompareCols[i], e.getRow(e.curRowIdx), e.getRow(end))
if err != nil {
return 0, err
}
Expand Down
4 changes: 2 additions & 2 deletions executor/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (p *rangeFrameWindowProcessor) getStartOffset(ctx sessionctx.Context, rows
var res int64
var err error
for i := range p.orderByCols {
res, _, err = p.start.CmpFuncs[i](ctx, p.orderByCols[i], p.start.CalcFuncs[i], rows[p.lastStartOffset], rows[p.curRowIdx])
res, _, err = p.start.CmpFuncs[i](ctx, p.start.CompareCols[i], p.start.CalcFuncs[i], rows[p.lastStartOffset], rows[p.curRowIdx])
if err != nil {
return 0, err
}
Expand All @@ -423,7 +423,7 @@ func (p *rangeFrameWindowProcessor) getEndOffset(ctx sessionctx.Context, rows []
var res int64
var err error
for i := range p.orderByCols {
res, _, err = p.end.CmpFuncs[i](ctx, p.end.CalcFuncs[i], p.orderByCols[i], rows[p.curRowIdx], rows[p.lastEndOffset])
res, _, err = p.end.CmpFuncs[i](ctx, p.end.CalcFuncs[i], p.end.CompareCols[i], rows[p.curRowIdx], rows[p.lastEndOffset])
if err != nil {
return 0, err
}
Expand Down
13 changes: 13 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8422,6 +8422,7 @@ func TestIssue45033(t *testing.T) {
where alias4.c2 = alias2.alias_col1);`).Check(testkit.Rows("0"))
}

<<<<<<< HEAD
func TestIssue43116(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand All @@ -8433,4 +8434,16 @@ func TestIssue43116(t *testing.T) {
"└─Selection 8000.00 cop[tikv] in(test.sbtest1.id, 1, 1, 1, 1, 1), in(test.sbtest1.pad, \"1\", \"1\", \"1\", \"1\", \"1\")",
" └─TableFullScan 10000.00 cop[tikv] table:a keep order:false, stats:pseudo"))
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 Memory capacity of 111 bytes for 'tidb_opt_range_max_size' exceeded when building ranges. Less accurate ranges such as full range are chosen"))
=======
func TestIssue46298(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists test.first_range;")
tk.MustExec("create table test.first_range(p int not null, o tinyint not null, v int not null);")
tk.MustExec("insert into test.first_range (p, o, v) values (0, 0, 0), (1, 1, 1), (1, 2, 2), (1, 4, 4), (1, 8, 8), (2, 0, 0), (2, 3, 3), (2, 10, 10), (2, 13, 13), (2, 15, 15), (3, 1, 1), (3, 3, 3), (3, 5, 5), (3, 9, 9), (3, 15, 15), (3, 20, 20), (3, 31, 31);")
tk.MustQuery("select *, first_value(v) over (partition by p order by o range between 3.1 preceding and 2.9 following) as a from test.first_range;")
tk.MustExec(`set @@tidb_enable_pipelined_window_function=0`)
tk.MustQuery("select *, first_value(v) over (partition by p order by o range between 3.1 preceding and 2.9 following) as a from test.first_range;")
>>>>>>> b3ec110e187 (executor: fix the error that is raised when executing the window function with range type frame (#46927))
}
1 change: 1 addition & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6436,6 +6436,7 @@ func (b *PlanBuilder) buildWindowFunctionFrameBound(_ context.Context, spec *ast
if err != nil {
return nil, err
}

bound.CmpFuncs[0] = expression.GetCmpFunction(b.ctx, orderByItems[0].Col, bound.CalcFuncs[0])
return bound, nil
}
Expand Down
19 changes: 19 additions & 0 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,8 @@ type FrameBound struct {
// We will build the date_add or date_sub functions for frames like `INTERVAL '2:30' MINUTE_SECOND FOLLOWING`,
// and plus or minus for frames like `1 preceding`.
CalcFuncs []expression.Expression
// Sometimes we need to cast order by column to a specific type when frame type is range
CompareCols []expression.Expression
// CmpFuncs is used to decide whether one row is included in the current frame.
CmpFuncs []expression.CompareFunc
}
Expand All @@ -1741,6 +1743,23 @@ func (fb *FrameBound) Clone() *FrameBound {
return cloned
}

// InitCompareCols will init CompareCols
func (fb *FrameBound) InitCompareCols(ctx sessionctx.Context, orderByCols []*expression.Column) {
if len(fb.CalcFuncs) > 0 {
fb.CompareCols = make([]expression.Expression, len(orderByCols))
if fb.CalcFuncs[0].GetType().EvalType() != orderByCols[0].GetType().EvalType() {
fb.CompareCols[0], _ = expression.NewFunctionBase(ctx, ast.Cast, fb.CalcFuncs[0].GetType(), orderByCols[0])

// As compare column has been converted, compare function should also be changed
fb.CmpFuncs[0] = expression.GetCmpFunction(ctx, fb.CompareCols[0], fb.CalcFuncs[0])
} else {
for i, col := range orderByCols {
fb.CompareCols[i] = col
}
}
}
}

// LogicalWindow represents a logical window function plan.
type LogicalWindow struct {
logicalSchemaProducer
Expand Down