Skip to content

Commit

Permalink
*: fix cte miss cleaning spilled-disk file (#44501) (#44527)
Browse files Browse the repository at this point in the history
close #44477
  • Loading branch information
ti-chi-bot authored Oct 10, 2023
1 parent 472fa31 commit 2d27ec6
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 36 deletions.
56 changes: 51 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,11 @@ func (a *recordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {

func (a *recordSet) Close() error {
err := a.executor.Close()
a.stmt.CloseRecordSet(a.txnStartTS, a.lastErr)
return err
err1 := a.stmt.CloseRecordSet(a.txnStartTS, a.lastErr)
if err != nil {
return err
}
return err1
}

// OnFetchReturned implements commandLifeCycle#OnFetchReturned
Expand Down Expand Up @@ -469,6 +472,13 @@ func (a *ExecStmt) handleNoDelay(ctx context.Context, e Executor, isPessimistic
if sc.DiskTracker != nil {
sc.DiskTracker.DetachFromGlobalTracker()
}
if handled {
cteErr := resetCTEStorageMap(a.Ctx)
if err == nil {
// Only overwrite err when it's nil.
err = cteErr
}
}
}
}()

Expand Down Expand Up @@ -562,8 +572,7 @@ func (c *chunkRowRecordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
}

func (c *chunkRowRecordSet) Close() error {
c.execStmt.CloseRecordSet(c.execStmt.Ctx.GetSessionVars().TxnCtx.StartTS, nil)
return nil
return c.execStmt.CloseRecordSet(c.execStmt.Ctx.GetSessionVars().TxnCtx.StartTS, nil)
}

func (a *ExecStmt) handlePessimisticSelectForUpdate(ctx context.Context, e Executor) (sqlexec.RecordSet, error) {
Expand Down Expand Up @@ -970,7 +979,11 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
}

// CloseRecordSet will finish the execution of current statement and do some record work
func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) error {
cteErr := resetCTEStorageMap(a.Ctx)
if cteErr != nil {
logutil.BgLogger().Error("got error when reset cte storage, should check if the spill disk file deleted or not", zap.Error(cteErr))
}
a.FinishExecuteStmt(txnStartTS, lastErr, false)
a.logAudit()
// Detach the Memory and disk tracker for the previous stmtCtx from GlobalMemoryUsageTracker and GlobalDiskUsageTracker
Expand All @@ -982,6 +995,39 @@ func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
stmtCtx.MemTracker.DetachFromGlobalTracker()
}
}
return cteErr
}

// Clean CTE storage shared by different CTEFullScan executor within a SQL stmt.
// Will return err in two situations:
// 1. Got err when remove disk spill file.
// 2. Some logical error like ref count of CTEStorage is less than 0.
func resetCTEStorageMap(se sessionctx.Context) error {
tmp := se.GetSessionVars().StmtCtx.CTEStorageMap
if tmp == nil {
// Close() is already called, so no need to reset. Such as TraceExec.
return nil
}
storageMap, ok := tmp.(map[int]*CTEStorages)
if !ok {
return errors.New("type assertion for CTEStorageMap failed")
}
for _, v := range storageMap {
v.ResTbl.Lock()
err1 := v.ResTbl.DerefAndClose()
// Make sure we do not hold the lock for longer than necessary.
v.ResTbl.Unlock()
// No need to lock IterInTbl.
err2 := v.IterInTbl.DerefAndClose()
if err1 != nil {
return err1
}
if err2 != nil {
return err2
}
}
se.GetSessionVars().StmtCtx.CTEStorageMap = nil
return nil
}

// LogSlowQuery is used to print the slow query in the log files.
Expand Down
25 changes: 24 additions & 1 deletion executor/cte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,11 @@ func TestCTEWithLimit(t *testing.T) {
}

func TestSpillToDisk(t *testing.T) {
defer config.RestoreFunc()()
oriGlobalConfig := config.GetGlobalConfig()
config.UpdateGlobal(func(conf *config.Config) {
conf.OOMUseTmpStorage = true
})
defer config.StoreGlobalConfig(oriGlobalConfig)

store, close := testkit.CreateMockStore(t)
defer close()
Expand Down Expand Up @@ -482,3 +483,25 @@ func TestCTEPanic(t *testing.T) {
require.Contains(t, err.Error(), fp)
require.NoError(t, failpoint.Disable(fpPathPrefix+fp))
}

func TestCTEDelSpillFile(t *testing.T) {
oriGlobalConfig := config.GetGlobalConfig()
config.UpdateGlobal(func(conf *config.Config) {
conf.OOMUseTmpStorage = true
conf.OOMAction = config.OOMActionLog
})
defer config.StoreGlobalConfig(oriGlobalConfig)

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists t1, t2;")
tk.MustExec("create table t1(c1 int, c2 int);")
tk.MustExec("create table t2(c1 int);")
tk.MustExec("set @@cte_max_recursion_depth = 1000000;")
tk.MustExec("set @@tidb_mem_quota_query = 100;")
tk.MustExec("insert into t2 values(1);")
tk.MustExec("insert into t1 (c1, c2) with recursive cte1 as (select c1 from t2 union select cte1.c1 + 1 from cte1 where cte1.c1 < 100000) select cte1.c1, cte1.c1+1 from cte1;")
require.Nil(t, tk.Session().GetSessionVars().StmtCtx.CTEStorageMap)
}
8 changes: 8 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9705,6 +9705,7 @@ func (s *testSerialSuite) TestUnreasonablyClose(c *C) {
p, _, err := planner.Optimize(context.TODO(), se, stmt, is)
c.Assert(err, IsNil, comment)
// This for loop level traverses the plan tree to get which operators are covered.
var hasCTE bool
for child := []plannercore.PhysicalPlan{p.(plannercore.PhysicalPlan)}; len(child) != 0; {
newChild := make([]plannercore.PhysicalPlan, 0, len(child))
for _, ch := range child {
Expand All @@ -9721,6 +9722,7 @@ func (s *testSerialSuite) TestUnreasonablyClose(c *C) {
case *plannercore.PhysicalCTE:
newChild = append(newChild, x.RecurPlan)
newChild = append(newChild, x.SeedPlan)
hasCTE = true
continue
case *plannercore.PhysicalShuffle:
newChild = append(newChild, x.DataSources...)
Expand All @@ -9732,6 +9734,12 @@ func (s *testSerialSuite) TestUnreasonablyClose(c *C) {
child = newChild
}

if hasCTE {
// Normally CTEStorages will be setup in ResetContextOfStmt.
// But the following case call e.Close() directly, instead of calling session.ExecStmt(), which calls ResetContextOfStmt.
// So need to setup CTEStorages manually.
se.GetSessionVars().StmtCtx.CTEStorageMap = map[int]*executor.CTEStorages{}
}
e := executorBuilder.Build(p)

func() {
Expand Down
30 changes: 0 additions & 30 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1903,39 +1903,9 @@ func (rs *execStmtResult) Close() error {
if err := rs.RecordSet.Close(); err != nil {
return finishStmt(context.Background(), se, err, rs.sql)
}
if err := resetCTEStorageMap(se); err != nil {
return finishStmt(context.Background(), se, err, rs.sql)
}
return finishStmt(context.Background(), se, nil, rs.sql)
}

func resetCTEStorageMap(se *session) error {
tmp := se.GetSessionVars().StmtCtx.CTEStorageMap
if tmp == nil {
// Close() is already called, so no need to reset. Such as TraceExec.
return nil
}
storageMap, ok := tmp.(map[int]*executor.CTEStorages)
if !ok {
return errors.New("type assertion for CTEStorageMap failed")
}
for _, v := range storageMap {
// No need to lock IterInTbl.
v.ResTbl.Lock()
defer v.ResTbl.Unlock()
err1 := v.ResTbl.DerefAndClose()
err2 := v.IterInTbl.DerefAndClose()
if err1 != nil {
return err1
}
if err2 != nil {
return err2
}
}
se.GetSessionVars().StmtCtx.CTEStorageMap = nil
return nil
}

// rollbackOnError makes sure the next statement starts a new transaction with the latest InfoSchema.
func (s *session) rollbackOnError(ctx context.Context) {
if !s.sessionVars.InTxn() {
Expand Down

0 comments on commit 2d27ec6

Please sign in to comment.