diff --git a/executor/sort.go b/executor/sort.go index d60f45afc0ca4..cce2fecc619a1 100644 --- a/executor/sort.go +++ b/executor/sort.go @@ -227,7 +227,10 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error { } } if e.rowChunks.NumRow() > 0 { - e.rowChunks.Sort() + err := e.rowChunks.Sort() + if err != nil { + return err + } e.partitionList = append(e.partitionList, e.rowChunks) } return nil diff --git a/util/chunk/row_container.go b/util/chunk/row_container.go index 3407022f4718f..873c4a6b6b82f 100644 --- a/util/chunk/row_container.go +++ b/util/chunk/row_container.go @@ -16,6 +16,7 @@ package chunk import ( "errors" + "fmt" "sort" "sync" "time" @@ -73,6 +74,11 @@ func (m *mutexForRowContainer) RUnlock() { m.rLock.RUnlock() } +type spillHelper interface { + SpillToDisk() + hasEnoughDataToSpill(t *memory.Tracker) bool +} + // RowContainer provides a place for many rows, so many that we might want to spill them into disk. // nolint:structcheck type RowContainer struct { @@ -118,6 +124,14 @@ func (c *RowContainer) ShallowCopyWithNewMutex() *RowContainer { // SpillToDisk spills data to disk. This function may be called in parallel. func (c *RowContainer) SpillToDisk() { + c.spillToDisk(nil) +} + +func (*RowContainer) hasEnoughDataToSpill(_ *memory.Tracker) bool { + return true +} + +func (c *RowContainer) spillToDisk(preSpillError error) { c.m.Lock() defer c.m.Unlock() if c.alreadySpilled() { @@ -137,6 +151,22 @@ func (c *RowContainer) SpillToDisk() { N := c.m.records.inMemory.NumChunks() c.m.records.inDisk = NewListInDisk(c.m.records.inMemory.FieldTypes()) c.m.records.inDisk.diskTracker.AttachTo(c.diskTracker) + defer func() { + if r := recover(); r != nil { + err := fmt.Errorf("%v", r) + c.m.records.spillError = err + logutil.BgLogger().Error("spill to disk failed", zap.Stack("stack"), zap.Error(err)) + } + }() + failpoint.Inject("spillToDiskOutOfDiskQuota", func(val failpoint.Value) { + if val.(bool) { + panic("out of disk quota when spilling") + } + }) + if preSpillError != nil { + c.m.records.spillError = preSpillError + return + } for i := 0; i < N; i++ { chk := c.m.records.inMemory.GetChunk(i) err = c.m.records.inDisk.Add(chk) @@ -291,8 +321,9 @@ func (c *RowContainer) Close() (err error) { func (c *RowContainer) ActionSpill() *SpillDiskAction { if c.actionSpill == nil { c.actionSpill = &SpillDiskAction{ - c: c, - cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled}} + c: c, + baseSpillDiskAction: &baseSpillDiskAction{cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled}}, + } } return c.actionSpill } @@ -301,23 +332,21 @@ func (c *RowContainer) ActionSpill() *SpillDiskAction { func (c *RowContainer) ActionSpillForTest() *SpillDiskAction { c.actionSpill = &SpillDiskAction{ c: c, - testSyncInputFunc: func() { - c.actionSpill.testWg.Add(1) + baseSpillDiskAction: &baseSpillDiskAction{ + testSyncInputFunc: func() { + c.actionSpill.testWg.Add(1) + }, + testSyncOutputFunc: func() { + c.actionSpill.testWg.Done() + }, + cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled}, }, - testSyncOutputFunc: func() { - c.actionSpill.testWg.Done() - }, - cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled}, } return c.actionSpill } -// SpillDiskAction implements memory.ActionOnExceed for chunk.List. If -// the memory quota of a query is exceeded, SpillDiskAction.Action is -// triggered. -type SpillDiskAction struct { +type baseSpillDiskAction struct { memory.BaseOOMAction - c *RowContainer m sync.Mutex once sync.Once cond spillStatusCond @@ -328,6 +357,20 @@ type SpillDiskAction struct { testWg sync.WaitGroup } +// SpillDiskAction implements memory.ActionOnExceed for chunk.List. If +// the memory quota of a query is exceeded, SpillDiskAction.Action is +// triggered. +type SpillDiskAction struct { + c *RowContainer + *baseSpillDiskAction +} + +// Action sends a signal to trigger spillToDisk method of RowContainer +// and if it is already triggered before, call its fallbackAction. +func (a *SpillDiskAction) Action(t *memory.Tracker) { + a.action(t, a.c) +} + type spillStatusCond struct { *sync.Cond // status indicates different stages for the Action @@ -345,38 +388,35 @@ const ( spilledYet ) -func (a *SpillDiskAction) setStatus(status spillStatus) { +func (a *baseSpillDiskAction) setStatus(status spillStatus) { a.cond.L.Lock() defer a.cond.L.Unlock() a.cond.status = status } -func (a *SpillDiskAction) getStatus() spillStatus { +func (a *baseSpillDiskAction) getStatus() spillStatus { a.cond.L.Lock() defer a.cond.L.Unlock() return a.cond.status } -// Action sends a signal to trigger spillToDisk method of RowContainer -// and if it is already triggered before, call its fallbackAction. -func (a *SpillDiskAction) Action(t *memory.Tracker) { +func (a *baseSpillDiskAction) action(t *memory.Tracker, spillHelper spillHelper) { a.m.Lock() defer a.m.Unlock() - if a.getStatus() == notSpilled { + if a.getStatus() == notSpilled && spillHelper.hasEnoughDataToSpill(t) { a.once.Do(func() { logutil.BgLogger().Info("memory exceeds quota, spill to disk now.", zap.Int64("consumed", t.BytesConsumed()), zap.Int64("quota", t.GetBytesLimit())) if a.testSyncInputFunc != nil { a.testSyncInputFunc() - c := a.c go func() { - c.SpillToDisk() + spillHelper.SpillToDisk() a.testSyncOutputFunc() }() return } - go a.c.SpillToDisk() + go spillHelper.SpillToDisk() }) return } @@ -396,7 +436,7 @@ func (a *SpillDiskAction) Action(t *memory.Tracker) { } // Reset resets the status for SpillDiskAction. -func (a *SpillDiskAction) Reset() { +func (a *baseSpillDiskAction) Reset() { a.m.Lock() defer a.m.Unlock() a.setStatus(notSpilled) @@ -407,12 +447,12 @@ func (a *SpillDiskAction) Reset() { func (a *SpillDiskAction) SetLogHook(hook func(uint64)) {} // GetPriority get the priority of the Action. -func (a *SpillDiskAction) GetPriority() int64 { +func (a *baseSpillDiskAction) GetPriority() int64 { return memory.DefSpillPriority } // WaitForTest waits all goroutine have gone. -func (a *SpillDiskAction) WaitForTest() { +func (a *baseSpillDiskAction) WaitForTest() { a.testWg.Wait() } @@ -479,9 +519,15 @@ func (c *SortedRowContainer) keyColumnsLess(i, j int) bool { } // Sort inits pointers and sorts the records. -func (c *SortedRowContainer) Sort() { +func (c *SortedRowContainer) Sort() (ret error) { c.ptrM.Lock() defer c.ptrM.Unlock() + ret = nil + defer func() { + if r := recover(); r != nil { + ret = fmt.Errorf("%v", r) + } + }() if c.ptrM.rowPtrs != nil { return } @@ -496,13 +542,25 @@ func (c *SortedRowContainer) Sort() { c.ptrM.rowPtrs = append(c.ptrM.rowPtrs, RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) } } + failpoint.Inject("errorDuringSortRowContainer", func(val failpoint.Value) { + if val.(bool) { + panic("sort meet error") + } + }) sort.Slice(c.ptrM.rowPtrs, c.keyColumnsLess) c.GetMemTracker().Consume(int64(8 * c.numRow)) + return } -func (c *SortedRowContainer) sortAndSpillToDisk() { - c.Sort() - c.RowContainer.SpillToDisk() +// SpillToDisk spills data to disk. This function may be called in parallel. +func (c *SortedRowContainer) SpillToDisk() { + err := c.Sort() + c.RowContainer.spillToDisk(err) +} + +func (c *SortedRowContainer) hasEnoughDataToSpill(t *memory.Tracker) bool { + // Guarantee that each partition size is at least 10% of the threshold, to avoid opening too many files. + return c.GetMemTracker().BytesConsumed() > t.GetBytesLimit()/10 } // Add appends a chunk into the SortedRowContainer. @@ -527,8 +585,8 @@ func (c *SortedRowContainer) GetSortedRow(idx int) (Row, error) { func (c *SortedRowContainer) ActionSpill() *SortAndSpillDiskAction { if c.actionSpill == nil { c.actionSpill = &SortAndSpillDiskAction{ - c: c, - SpillDiskAction: c.RowContainer.ActionSpill(), + c: c, + baseSpillDiskAction: c.RowContainer.ActionSpill().baseSpillDiskAction, } } return c.actionSpill @@ -537,8 +595,8 @@ func (c *SortedRowContainer) ActionSpill() *SortAndSpillDiskAction { // ActionSpillForTest returns a SortAndSpillDiskAction for sorting and spilling over to disk for test. func (c *SortedRowContainer) ActionSpillForTest() *SortAndSpillDiskAction { c.actionSpill = &SortAndSpillDiskAction{ - c: c, - SpillDiskAction: c.RowContainer.ActionSpillForTest(), + c: c, + baseSpillDiskAction: c.RowContainer.ActionSpillForTest().baseSpillDiskAction, } return c.actionSpill } @@ -548,45 +606,13 @@ func (c *SortedRowContainer) ActionSpillForTest() *SortAndSpillDiskAction { // triggered. type SortAndSpillDiskAction struct { c *SortedRowContainer - *SpillDiskAction + *baseSpillDiskAction } // Action sends a signal to trigger sortAndSpillToDisk method of RowContainer // and if it is already triggered before, call its fallbackAction. func (a *SortAndSpillDiskAction) Action(t *memory.Tracker) { - a.m.Lock() - defer a.m.Unlock() - // Guarantee that each partition size is at least 10% of the threshold, to avoid opening too many files. - if a.getStatus() == notSpilled && a.c.GetMemTracker().BytesConsumed() > t.GetBytesLimit()/10 { - a.once.Do(func() { - logutil.BgLogger().Info("memory exceeds quota, spill to disk now.", - zap.Int64("consumed", t.BytesConsumed()), zap.Int64("quota", t.GetBytesLimit())) - if a.testSyncInputFunc != nil { - a.testSyncInputFunc() - c := a.c - go func() { - c.sortAndSpillToDisk() - a.testSyncOutputFunc() - }() - return - } - go a.c.sortAndSpillToDisk() - }) - return - } - - a.cond.L.Lock() - for a.cond.status == spilling { - a.cond.Wait() - } - a.cond.L.Unlock() - - if !t.CheckExceed() { - return - } - if fallback := a.GetFallback(); fallback != nil { - fallback.Action(t) - } + a.action(t, a.c) } // SetLogHook sets the hook, it does nothing just to form the memory.ActionOnExceed interface. diff --git a/util/chunk/row_container_test.go b/util/chunk/row_container_test.go index 2bf8ef26644bc..4798e77076017 100644 --- a/util/chunk/row_container_test.go +++ b/util/chunk/row_container_test.go @@ -303,3 +303,70 @@ func TestActionBlocked(t *testing.T) { ac.Action(tracker) require.GreaterOrEqual(t, time.Since(starttime), 200*time.Millisecond) } + +func TestPanicWhenSpillToDisk(t *testing.T) { + fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} + sz := 20 + chk := NewChunkWithCapacity(fields, sz) + for i := 0; i < sz; i++ { + chk.AppendInt64(0, int64(i)) + } + + rc := NewRowContainer(fields, sz) + tracker := rc.GetMemTracker() + tracker.SetBytesLimit(chk.MemoryUsage() + 1) + tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest()) + require.False(t, rc.AlreadySpilledSafeForTest()) + + require.NoError(t, rc.Add(chk)) + rc.actionSpill.WaitForTest() + require.False(t, rc.AlreadySpilledSafeForTest()) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota")) + }() + require.NoError(t, rc.Add(chk)) + rc.actionSpill.WaitForTest() + require.True(t, rc.AlreadySpilledSafeForTest()) + + _, err := rc.GetRow(RowPtr{}) + require.EqualError(t, err, "out of disk quota when spilling") + require.EqualError(t, rc.Add(chk), "out of disk quota when spilling") +} + +func TestPanicDuringSortedRowContainerSpill(t *testing.T) { + fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} + byItemsDesc := []bool{false} + keyColumns := []int{0} + keyCmpFuncs := []CompareFunc{cmpInt64} + sz := 20 + rc := NewSortedRowContainer(fields, sz, byItemsDesc, keyColumns, keyCmpFuncs) + + chk := NewChunkWithCapacity(fields, sz) + for i := 0; i < sz; i++ { + chk.AppendInt64(0, int64(i)) + } + var tracker *memory.Tracker + var err error + tracker = rc.GetMemTracker() + tracker.SetBytesLimit(chk.MemoryUsage() + int64(8*chk.NumRows()) + 1) + tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest()) + require.False(t, rc.AlreadySpilledSafeForTest()) + err = rc.Add(chk) + require.NoError(t, err) + rc.actionSpill.WaitForTest() + require.False(t, rc.AlreadySpilledSafeForTest()) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/errorDuringSortRowContainer", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/errorDuringSortRowContainer")) + }() + err = rc.Add(chk) + require.NoError(t, err) + rc.actionSpill.WaitForTest() + require.True(t, rc.AlreadySpilledSafeForTest()) + + _, err = rc.GetRow(RowPtr{}) + require.EqualError(t, err, "sort meet error") +}