Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: set the job history status to cancelled if it's removed in GC and it's still running (#58539) #58711

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ const ttlTableStatusGCWithoutIDTemplate = `DELETE FROM mysql.tidb_ttl_table_stat

const timeFormat = time.DateTime

// don't remove the rows for non-exist tables directly. Instead, set them to cancelled. In some special situations, the TTL job may still be able
// to finish correctly. If that happen, the status will be updated from 'cancelled' to 'finished' in `(*ttlJob).finish`
const ttlJobHistoryGCNonExistTableTemplate = `UPDATE mysql.tidb_ttl_job_history SET status = 'cancelled'
WHERE table_id NOT IN (SELECT table_id FROM mysql.tidb_ttl_table_status) AND status = 'running'`

func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) (string, []any) {
return insertNewTableIntoStatusTemplate, []any{tableID, parentTableID}
}
Expand Down Expand Up @@ -1055,6 +1060,10 @@ func (m *JobManager) DoGC(ctx context.Context, se session.Session, now time.Time
if _, err := se.ExecuteSQL(ctx, ttlJobHistoryGCTemplate); err != nil {
logutil.Logger(ctx).Warn("fail to gc ttl job history", zap.Error(err))
}

if _, err := se.ExecuteSQL(ctx, ttlJobHistoryGCNonExistTableTemplate); err != nil {
logutil.Logger(ctx).Warn("fail to gc ttl job history for non-exist table", zap.Error(err))
}
}

// GetDelayMetricRecords gets the records of TTL delay metrics
Expand Down
48 changes: 48 additions & 0 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1565,3 +1565,51 @@ func TestDisableTTLAfterLoseHeartbeat(t *testing.T) {
tk.MustQuery("select current_job_status, current_job_owner_hb_time from mysql.tidb_ttl_table_status").Check(testkit.Rows())
})
}

func TestTimerJobAfterDropTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)

pool := wrapPoolForTest(dom.SysSessionPool())

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (created_at datetime) TTL = created_at + INTERVAL 1 HOUR")
tbl, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)
m := ttlworker.NewJobManager("test-job-manager", pool, store, nil, func() bool { return true })

se, err := ttlworker.GetSessionForTest(pool)
require.NoError(t, err)
defer se.Close()

// First, schedule the job. The row in the `tidb_ttl_table_status` and `tidb_ttl_job_history` will be created
jobID := "test-job-id"

require.NoError(t, m.InfoSchemaCache().Update(se))
err = m.SubmitJob(se, tbl.Meta().ID, tbl.Meta().ID, jobID)
require.NoError(t, err)
now := se.Now()
tk.MustQuery("select count(*) from mysql.tidb_ttl_table_status").Check(testkit.Rows("1"))
tk.MustQuery("select count(*) from mysql.tidb_ttl_job_history").Check(testkit.Rows("1"))

// Drop the table, then the `m` somehow lost heartbeat for 2*heartbeat interval, and GC TTL jobs
tk.MustExec("drop table t")

now = now.Add(time.Hour * 2)
m.DoGC(context.Background(), se, now)
tk.MustQuery("select count(*) from mysql.tidb_ttl_table_status").Check(testkit.Rows("0"))
tk.MustQuery("select status from mysql.tidb_ttl_job_history").Check(testkit.Rows("cancelled"))

require.NoError(t, m.TableStatusCache().Update(context.Background(), se))
require.NoError(t, m.InfoSchemaCache().Update(se))
m.CheckNotOwnJob()
require.Len(t, m.RunningJobs(), 0)

// The adapter should not return the job
adapter := ttlworker.NewManagerJobAdapter(store, pool, nil)
job, err := adapter.GetJob(context.Background(), tbl.Meta().ID, tbl.Meta().ID, jobID)
require.NoError(t, err)
require.NotNil(t, job)
require.True(t, job.Finished)
}
5 changes: 5 additions & 0 deletions pkg/ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ func (m *JobManager) ReportMetrics(se session.Session) {
m.reportMetrics(se)
}

// CheckNotOwnJob is an exported version of checkNotOwnJob
func (m *JobManager) CheckNotOwnJob() {
m.checkNotOwnJob()
}

// CheckFinishedJob is an exported version of checkFinishedJob
func (m *JobManager) CheckFinishedJob(se session.Session) {
m.checkFinishedJob(se)
Expand Down