Skip to content

Commit

Permalink
executor: Fix crash during sort spill (#47581) (#47627)
Browse files Browse the repository at this point in the history
close #47538
  • Loading branch information
ti-chi-bot authored Oct 17, 2023
1 parent 55b705d commit 5cd4040
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 68 deletions.
5 changes: 4 additions & 1 deletion executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error {
}
}
if e.rowChunks.NumRow() > 0 {
e.rowChunks.Sort()
err := e.rowChunks.Sort()
if err != nil {
return err
}
e.partitionList = append(e.partitionList, e.rowChunks)
}
return nil
Expand Down
160 changes: 93 additions & 67 deletions util/chunk/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package chunk

import (
"errors"
"fmt"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -73,6 +74,11 @@ func (m *mutexForRowContainer) RUnlock() {
m.rLock.RUnlock()
}

type spillHelper interface {
SpillToDisk()
hasEnoughDataToSpill(t *memory.Tracker) bool
}

// RowContainer provides a place for many rows, so many that we might want to spill them into disk.
// nolint:structcheck
type RowContainer struct {
Expand Down Expand Up @@ -118,6 +124,14 @@ func (c *RowContainer) ShallowCopyWithNewMutex() *RowContainer {

// SpillToDisk spills data to disk. This function may be called in parallel.
func (c *RowContainer) SpillToDisk() {
c.spillToDisk(nil)
}

func (*RowContainer) hasEnoughDataToSpill(_ *memory.Tracker) bool {
return true
}

func (c *RowContainer) spillToDisk(preSpillError error) {
c.m.Lock()
defer c.m.Unlock()
if c.alreadySpilled() {
Expand All @@ -137,6 +151,22 @@ func (c *RowContainer) SpillToDisk() {
N := c.m.records.inMemory.NumChunks()
c.m.records.inDisk = NewListInDisk(c.m.records.inMemory.FieldTypes())
c.m.records.inDisk.diskTracker.AttachTo(c.diskTracker)
defer func() {
if r := recover(); r != nil {
err := fmt.Errorf("%v", r)
c.m.records.spillError = err
logutil.BgLogger().Error("spill to disk failed", zap.Stack("stack"), zap.Error(err))
}
}()
failpoint.Inject("spillToDiskOutOfDiskQuota", func(val failpoint.Value) {
if val.(bool) {
panic("out of disk quota when spilling")
}
})
if preSpillError != nil {
c.m.records.spillError = preSpillError
return
}
for i := 0; i < N; i++ {
chk := c.m.records.inMemory.GetChunk(i)
err = c.m.records.inDisk.Add(chk)
Expand Down Expand Up @@ -291,8 +321,9 @@ func (c *RowContainer) Close() (err error) {
func (c *RowContainer) ActionSpill() *SpillDiskAction {
if c.actionSpill == nil {
c.actionSpill = &SpillDiskAction{
c: c,
cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled}}
c: c,
baseSpillDiskAction: &baseSpillDiskAction{cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled}},
}
}
return c.actionSpill
}
Expand All @@ -301,23 +332,21 @@ func (c *RowContainer) ActionSpill() *SpillDiskAction {
func (c *RowContainer) ActionSpillForTest() *SpillDiskAction {
c.actionSpill = &SpillDiskAction{
c: c,
testSyncInputFunc: func() {
c.actionSpill.testWg.Add(1)
baseSpillDiskAction: &baseSpillDiskAction{
testSyncInputFunc: func() {
c.actionSpill.testWg.Add(1)
},
testSyncOutputFunc: func() {
c.actionSpill.testWg.Done()
},
cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled},
},
testSyncOutputFunc: func() {
c.actionSpill.testWg.Done()
},
cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled},
}
return c.actionSpill
}

// SpillDiskAction implements memory.ActionOnExceed for chunk.List. If
// the memory quota of a query is exceeded, SpillDiskAction.Action is
// triggered.
type SpillDiskAction struct {
type baseSpillDiskAction struct {
memory.BaseOOMAction
c *RowContainer
m sync.Mutex
once sync.Once
cond spillStatusCond
Expand All @@ -328,6 +357,20 @@ type SpillDiskAction struct {
testWg sync.WaitGroup
}

// SpillDiskAction implements memory.ActionOnExceed for chunk.List. If
// the memory quota of a query is exceeded, SpillDiskAction.Action is
// triggered.
type SpillDiskAction struct {
c *RowContainer
*baseSpillDiskAction
}

// Action sends a signal to trigger spillToDisk method of RowContainer
// and if it is already triggered before, call its fallbackAction.
func (a *SpillDiskAction) Action(t *memory.Tracker) {
a.action(t, a.c)
}

type spillStatusCond struct {
*sync.Cond
// status indicates different stages for the Action
Expand All @@ -345,38 +388,35 @@ const (
spilledYet
)

func (a *SpillDiskAction) setStatus(status spillStatus) {
func (a *baseSpillDiskAction) setStatus(status spillStatus) {
a.cond.L.Lock()
defer a.cond.L.Unlock()
a.cond.status = status
}

func (a *SpillDiskAction) getStatus() spillStatus {
func (a *baseSpillDiskAction) getStatus() spillStatus {
a.cond.L.Lock()
defer a.cond.L.Unlock()
return a.cond.status
}

// Action sends a signal to trigger spillToDisk method of RowContainer
// and if it is already triggered before, call its fallbackAction.
func (a *SpillDiskAction) Action(t *memory.Tracker) {
func (a *baseSpillDiskAction) action(t *memory.Tracker, spillHelper spillHelper) {
a.m.Lock()
defer a.m.Unlock()

if a.getStatus() == notSpilled {
if a.getStatus() == notSpilled && spillHelper.hasEnoughDataToSpill(t) {
a.once.Do(func() {
logutil.BgLogger().Info("memory exceeds quota, spill to disk now.",
zap.Int64("consumed", t.BytesConsumed()), zap.Int64("quota", t.GetBytesLimit()))
if a.testSyncInputFunc != nil {
a.testSyncInputFunc()
c := a.c
go func() {
c.SpillToDisk()
spillHelper.SpillToDisk()
a.testSyncOutputFunc()
}()
return
}
go a.c.SpillToDisk()
go spillHelper.SpillToDisk()
})
return
}
Expand All @@ -396,7 +436,7 @@ func (a *SpillDiskAction) Action(t *memory.Tracker) {
}

// Reset resets the status for SpillDiskAction.
func (a *SpillDiskAction) Reset() {
func (a *baseSpillDiskAction) Reset() {
a.m.Lock()
defer a.m.Unlock()
a.setStatus(notSpilled)
Expand All @@ -407,12 +447,12 @@ func (a *SpillDiskAction) Reset() {
func (a *SpillDiskAction) SetLogHook(hook func(uint64)) {}

// GetPriority get the priority of the Action.
func (a *SpillDiskAction) GetPriority() int64 {
func (a *baseSpillDiskAction) GetPriority() int64 {
return memory.DefSpillPriority
}

// WaitForTest waits all goroutine have gone.
func (a *SpillDiskAction) WaitForTest() {
func (a *baseSpillDiskAction) WaitForTest() {
a.testWg.Wait()
}

Expand Down Expand Up @@ -479,9 +519,15 @@ func (c *SortedRowContainer) keyColumnsLess(i, j int) bool {
}

// Sort inits pointers and sorts the records.
func (c *SortedRowContainer) Sort() {
func (c *SortedRowContainer) Sort() (ret error) {
c.ptrM.Lock()
defer c.ptrM.Unlock()
ret = nil
defer func() {
if r := recover(); r != nil {
ret = fmt.Errorf("%v", r)
}
}()
if c.ptrM.rowPtrs != nil {
return
}
Expand All @@ -496,13 +542,25 @@ func (c *SortedRowContainer) Sort() {
c.ptrM.rowPtrs = append(c.ptrM.rowPtrs, RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)})
}
}
failpoint.Inject("errorDuringSortRowContainer", func(val failpoint.Value) {
if val.(bool) {
panic("sort meet error")
}
})
sort.Slice(c.ptrM.rowPtrs, c.keyColumnsLess)
c.GetMemTracker().Consume(int64(8 * c.numRow))
return
}

func (c *SortedRowContainer) sortAndSpillToDisk() {
c.Sort()
c.RowContainer.SpillToDisk()
// SpillToDisk spills data to disk. This function may be called in parallel.
func (c *SortedRowContainer) SpillToDisk() {
err := c.Sort()
c.RowContainer.spillToDisk(err)
}

func (c *SortedRowContainer) hasEnoughDataToSpill(t *memory.Tracker) bool {
// Guarantee that each partition size is at least 10% of the threshold, to avoid opening too many files.
return c.GetMemTracker().BytesConsumed() > t.GetBytesLimit()/10
}

// Add appends a chunk into the SortedRowContainer.
Expand All @@ -527,8 +585,8 @@ func (c *SortedRowContainer) GetSortedRow(idx int) (Row, error) {
func (c *SortedRowContainer) ActionSpill() *SortAndSpillDiskAction {
if c.actionSpill == nil {
c.actionSpill = &SortAndSpillDiskAction{
c: c,
SpillDiskAction: c.RowContainer.ActionSpill(),
c: c,
baseSpillDiskAction: c.RowContainer.ActionSpill().baseSpillDiskAction,
}
}
return c.actionSpill
Expand All @@ -537,8 +595,8 @@ func (c *SortedRowContainer) ActionSpill() *SortAndSpillDiskAction {
// ActionSpillForTest returns a SortAndSpillDiskAction for sorting and spilling over to disk for test.
func (c *SortedRowContainer) ActionSpillForTest() *SortAndSpillDiskAction {
c.actionSpill = &SortAndSpillDiskAction{
c: c,
SpillDiskAction: c.RowContainer.ActionSpillForTest(),
c: c,
baseSpillDiskAction: c.RowContainer.ActionSpillForTest().baseSpillDiskAction,
}
return c.actionSpill
}
Expand All @@ -548,45 +606,13 @@ func (c *SortedRowContainer) ActionSpillForTest() *SortAndSpillDiskAction {
// triggered.
type SortAndSpillDiskAction struct {
c *SortedRowContainer
*SpillDiskAction
*baseSpillDiskAction
}

// Action sends a signal to trigger sortAndSpillToDisk method of RowContainer
// and if it is already triggered before, call its fallbackAction.
func (a *SortAndSpillDiskAction) Action(t *memory.Tracker) {
a.m.Lock()
defer a.m.Unlock()
// Guarantee that each partition size is at least 10% of the threshold, to avoid opening too many files.
if a.getStatus() == notSpilled && a.c.GetMemTracker().BytesConsumed() > t.GetBytesLimit()/10 {
a.once.Do(func() {
logutil.BgLogger().Info("memory exceeds quota, spill to disk now.",
zap.Int64("consumed", t.BytesConsumed()), zap.Int64("quota", t.GetBytesLimit()))
if a.testSyncInputFunc != nil {
a.testSyncInputFunc()
c := a.c
go func() {
c.sortAndSpillToDisk()
a.testSyncOutputFunc()
}()
return
}
go a.c.sortAndSpillToDisk()
})
return
}

a.cond.L.Lock()
for a.cond.status == spilling {
a.cond.Wait()
}
a.cond.L.Unlock()

if !t.CheckExceed() {
return
}
if fallback := a.GetFallback(); fallback != nil {
fallback.Action(t)
}
a.action(t, a.c)
}

// SetLogHook sets the hook, it does nothing just to form the memory.ActionOnExceed interface.
Expand Down
67 changes: 67 additions & 0 deletions util/chunk/row_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,70 @@ func TestActionBlocked(t *testing.T) {
ac.Action(tracker)
require.GreaterOrEqual(t, time.Since(starttime), 200*time.Millisecond)
}

func TestPanicWhenSpillToDisk(t *testing.T) {
fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}
sz := 20
chk := NewChunkWithCapacity(fields, sz)
for i := 0; i < sz; i++ {
chk.AppendInt64(0, int64(i))
}

rc := NewRowContainer(fields, sz)
tracker := rc.GetMemTracker()
tracker.SetBytesLimit(chk.MemoryUsage() + 1)
tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest())
require.False(t, rc.AlreadySpilledSafeForTest())

require.NoError(t, rc.Add(chk))
rc.actionSpill.WaitForTest()
require.False(t, rc.AlreadySpilledSafeForTest())

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota"))
}()
require.NoError(t, rc.Add(chk))
rc.actionSpill.WaitForTest()
require.True(t, rc.AlreadySpilledSafeForTest())

_, err := rc.GetRow(RowPtr{})
require.EqualError(t, err, "out of disk quota when spilling")
require.EqualError(t, rc.Add(chk), "out of disk quota when spilling")
}

func TestPanicDuringSortedRowContainerSpill(t *testing.T) {
fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}
byItemsDesc := []bool{false}
keyColumns := []int{0}
keyCmpFuncs := []CompareFunc{cmpInt64}
sz := 20
rc := NewSortedRowContainer(fields, sz, byItemsDesc, keyColumns, keyCmpFuncs)

chk := NewChunkWithCapacity(fields, sz)
for i := 0; i < sz; i++ {
chk.AppendInt64(0, int64(i))
}
var tracker *memory.Tracker
var err error
tracker = rc.GetMemTracker()
tracker.SetBytesLimit(chk.MemoryUsage() + int64(8*chk.NumRows()) + 1)
tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest())
require.False(t, rc.AlreadySpilledSafeForTest())
err = rc.Add(chk)
require.NoError(t, err)
rc.actionSpill.WaitForTest()
require.False(t, rc.AlreadySpilledSafeForTest())

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/errorDuringSortRowContainer", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/errorDuringSortRowContainer"))
}()
err = rc.Add(chk)
require.NoError(t, err)
rc.actionSpill.WaitForTest()
require.True(t, rc.AlreadySpilledSafeForTest())

_, err = rc.GetRow(RowPtr{})
require.EqualError(t, err, "sort meet error")
}

0 comments on commit 5cd4040

Please sign in to comment.