From 042a332aae6f31ae0d0ee929faadbd74b45cc685 Mon Sep 17 00:00:00 2001 From: CbcWestwolf <1004626265@qq.com> Date: Mon, 23 Dec 2024 17:12:06 +0800 Subject: [PATCH] metrics: add col/idx name(s) for BackfillProgressGauge and BackfillTotalCounter (#58380) close pingcap/tidb#58114 --- pkg/ddl/backfilling.go | 59 +++++++++++++++++++++++-------- pkg/ddl/backfilling_operators.go | 6 ---- pkg/ddl/backfilling_read_index.go | 22 +++++++++--- pkg/ddl/backfilling_scheduler.go | 5 +-- pkg/ddl/column.go | 26 +++++++++----- pkg/ddl/index.go | 2 +- pkg/ddl/modify_column.go | 2 +- pkg/ddl/partition.go | 8 ++--- pkg/ddl/reorg.go | 20 +++++++++-- pkg/metrics/ddl.go | 43 ++++++++++++++++------ 10 files changed, 137 insertions(+), 56 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index e3a63f6aa3680..f98897325cb7a 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -20,6 +20,7 @@ import ( "encoding/hex" "fmt" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -194,23 +195,49 @@ func newBackfillCtx(id int, rInfo *reorgInfo, id = int(backfillContextID.Add(1)) } + colOrIdxName := "" + switch rInfo.Job.Type { + case model.ActionAddIndex, model.ActionAddPrimaryKey: + args, err := model.GetModifyIndexArgs(rInfo.Job) + if err != nil { + logutil.DDLLogger().Error("Fail to get ModifyIndexArgs", zap.String("label", label), zap.String("schemaName", schemaName), zap.String("tableName", tbl.Meta().Name.String())) + } else { + colOrIdxName = getIdxNamesFromArgs(args) + } + case model.ActionModifyColumn: + oldCol, _ := getOldAndNewColumnsForUpdateColumn(tbl, rInfo.currElement.ID) + if oldCol != nil { + colOrIdxName = oldCol.Name.String() + } + } + batchCnt := rInfo.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())) return &backfillCtx{ - id: id, - ddlCtx: rInfo.jobCtx.oldDDLCtx, - warnings: warnHandler, - exprCtx: exprCtx, - tblCtx: tblCtx, - loc: exprCtx.GetEvalCtx().Location(), - schemaName: schemaName, - table: tbl, - batchCnt: batchCnt, - jobContext: jobCtx, - metricCounter: metrics.BackfillTotalCounter.WithLabelValues( - metrics.GenerateReorgLabel(label, schemaName, tbl.Meta().Name.String())), + id: id, + ddlCtx: rInfo.jobCtx.oldDDLCtx, + warnings: warnHandler, + exprCtx: exprCtx, + tblCtx: tblCtx, + loc: exprCtx.GetEvalCtx().Location(), + schemaName: schemaName, + table: tbl, + batchCnt: batchCnt, + jobContext: jobCtx, + metricCounter: metrics.GetBackfillTotalByLabel(label, schemaName, tbl.Meta().Name.String(), colOrIdxName), }, nil } +func getIdxNamesFromArgs(args *model.ModifyIndexArgs) string { + var sb strings.Builder + for i, idx := range args.IndexArgs { + if i > 0 { + sb.WriteString("+") + } + sb.WriteString(idx.IndexName.O) + } + return sb.String() +} + func updateTxnEntrySizeLimitIfNeeded(txn kv.Transaction) { if entrySizeLimit := variable.TxnEntrySizeLimit.Load(); entrySizeLimit > 0 { txn.SetOption(kv.SizeLimits, kv.TxnSizeLimits{ @@ -686,6 +713,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( idxCnt := len(reorgInfo.elements) indexIDs := make([]int64, 0, idxCnt) indexInfos := make([]*model.IndexInfo, 0, idxCnt) + var indexNames strings.Builder uniques := make([]bool, 0, idxCnt) hasUnique := false for _, e := range reorgInfo.elements { @@ -699,6 +727,10 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( return errors.Errorf("index info not found: %d", e.ID) } indexInfos = append(indexInfos, indexInfo) + if indexNames.Len() > 0 { + indexNames.WriteString("+") + } + indexNames.WriteString(indexInfo.Name.O) uniques = append(uniques, indexInfo.Unique) hasUnique = hasUnique || indexInfo.Unique } @@ -736,8 +768,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( rowCntListener := &localRowCntListener{ prevPhysicalRowCnt: reorgCtx.getRowCount(), reorgCtx: reorgCtx, - counter: metrics.BackfillTotalCounter.WithLabelValues( - metrics.GenerateReorgLabel("add_idx_rate", job.SchemaName, job.TableName)), + counter: metrics.GetBackfillTotalByLabel(metrics.LblAddIdxRate, job.SchemaName, job.TableName, indexNames.String()), } sctx, err := sessPool.Get() diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index d38582ebd2f1c..0d5f324c917ec 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -745,10 +745,6 @@ func NewIndexIngestOperator( writers = append(writers, writer) } - indexIDs := make([]int64, len(indexes)) - for i := 0; i < len(indexes); i++ { - indexIDs[i] = indexes[i].Meta().ID - } return &indexIngestLocalWorker{ indexIngestBaseWorker: indexIngestBaseWorker{ ctx: ctx, @@ -762,7 +758,6 @@ func NewIndexIngestOperator( srcChunkPool: srcChunkPool, reorgMeta: reorgMeta, }, - indexIDs: indexIDs, backendCtx: backendCtx, rowCntListener: rowCntListener, cpMgr: cpMgr, @@ -793,7 +788,6 @@ func (w *indexIngestExternalWorker) HandleTask(ck IndexRecordChunk, send func(In type indexIngestLocalWorker struct { indexIngestBaseWorker - indexIDs []int64 backendCtx ingest.BackendCtx rowCntListener RowCountListener cpMgr *ingest.CheckpointManager diff --git a/pkg/ddl/backfilling_read_index.go b/pkg/ddl/backfilling_read_index.go index db492ce3f4450..215f53caaa777 100644 --- a/pkg/ddl/backfilling_read_index.go +++ b/pkg/ddl/backfilling_read_index.go @@ -18,6 +18,7 @@ import ( "context" "encoding/hex" "encoding/json" + "strings" "sync" "sync/atomic" @@ -224,9 +225,14 @@ func (r *readIndexExecutor) buildLocalStorePipeline( d := r.d indexIDs := make([]int64, 0, len(r.indexes)) uniques := make([]bool, 0, len(r.indexes)) + var idxNames strings.Builder for _, index := range r.indexes { indexIDs = append(indexIDs, index.ID) uniques = append(uniques, index.Unique) + if idxNames.Len() > 0 { + idxNames.WriteByte('+') + } + idxNames.WriteString(index.Name.O) } engines, err := r.bc.Register(indexIDs, uniques, r.ptbl) if err != nil { @@ -236,7 +242,7 @@ func (r *readIndexExecutor) buildLocalStorePipeline( zap.Int64s("index IDs", indexIDs)) return nil, err } - rowCntListener := newDistTaskRowCntListener(r.curRowCount, r.job.SchemaName, tbl.Meta().Name.O) + rowCntListener := newDistTaskRowCntListener(r.curRowCount, r.job.SchemaName, tbl.Meta().Name.O, idxNames.String()) return NewAddIndexIngestPipeline( opCtx, d.store, @@ -280,7 +286,14 @@ func (r *readIndexExecutor) buildExternalStorePipeline( kvMeta.MergeSummary(summary) s.mu.Unlock() } - rowCntListener := newDistTaskRowCntListener(r.curRowCount, r.job.SchemaName, tbl.Meta().Name.O) + var idxNames strings.Builder + for _, idx := range r.indexes { + if idxNames.Len() > 0 { + idxNames.WriteByte('+') + } + idxNames.WriteString(idx.Name.O) + } + rowCntListener := newDistTaskRowCntListener(r.curRowCount, r.job.SchemaName, tbl.Meta().Name.O, idxNames.String()) return NewWriteIndexToExternalStoragePipeline( opCtx, d.store, @@ -307,9 +320,8 @@ type distTaskRowCntListener struct { counter prometheus.Counter } -func newDistTaskRowCntListener(totalRowCnt *atomic.Int64, dbName, tblName string) *distTaskRowCntListener { - counter := metrics.BackfillTotalCounter.WithLabelValues( - metrics.GenerateReorgLabel("add_idx_rate", dbName, tblName)) +func newDistTaskRowCntListener(totalRowCnt *atomic.Int64, dbName, tblName, idxName string) *distTaskRowCntListener { + counter := metrics.GetBackfillTotalByLabel(metrics.LblAddIdxRate, dbName, tblName, idxName) return &distTaskRowCntListener{ totalRowCount: totalRowCnt, counter: counter, diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index ddf78334e2200..ae118c8e09328 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/resourcegroup" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" @@ -271,7 +272,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error { ) switch b.tp { case typeAddIndexWorker: - backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, "add_idx_rate", false, false) + backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, metrics.LblAddIdxRate, false, false) if err != nil { return err } @@ -284,7 +285,7 @@ func (b *txnBackfillScheduler) adjustWorkerSize() error { runner = newBackfillWorker(b.ctx, idxWorker) worker = idxWorker case typeAddIndexMergeTmpWorker: - backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, "merge_tmp_idx_rate", false, false) + backfillCtx, err := newBackfillCtx(i, reorgInfo, job.SchemaName, b.tbl, jc, metrics.LblMergeTmpIdxRate, false, false) if err != nil { return err } diff --git a/pkg/ddl/column.go b/pkg/ddl/column.go index 11e359f45a641..237538456886e 100644 --- a/pkg/ddl/column.go +++ b/pkg/ddl/column.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/meta/autoid" "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/ast" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" @@ -609,8 +610,22 @@ type updateColumnWorker struct { checksumNeeded bool } +func getOldAndNewColumnsForUpdateColumn(t table.Table, currElementID int64) (oldCol, newCol *model.ColumnInfo) { + for _, col := range t.WritableCols() { + if col.ID == currElementID { + changeColumnOrigName := table.FindCol(t.Cols(), getChangingColumnOriginName(col.ColumnInfo)) + if changeColumnOrigName != nil { + newCol = col.ColumnInfo + oldCol = changeColumnOrigName.ColumnInfo + return + } + } + } + return +} + func newUpdateColumnWorker(id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*updateColumnWorker, error) { - bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, "update_col_rate", false, true) + bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, metrics.LblUpdateColRate, false, true) if err != nil { return nil, err } @@ -620,14 +635,7 @@ func newUpdateColumnWorker(id int, t table.PhysicalTable, decodeColMap map[int64 zap.Stringer("reorgInfo", reorgInfo)) return nil, nil } - var oldCol, newCol *model.ColumnInfo - for _, col := range t.WritableCols() { - if col.ID == reorgInfo.currElement.ID { - newCol = col.ColumnInfo - oldCol = table.FindCol(t.Cols(), getChangingColumnOriginName(newCol)).ColumnInfo - break - } - } + oldCol, newCol := getOldAndNewColumnsForUpdateColumn(t, reorgInfo.currElement.ID) rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) failpoint.Inject("forceRowLevelChecksumOnUpdateColumnBackfill", func() { orig := variable.EnableRowLevelChecksum.Load() diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 9c0e7296879cc..f3f2a83f9a8fb 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -2921,7 +2921,7 @@ type cleanUpIndexWorker struct { } func newCleanUpIndexWorker(id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*cleanUpIndexWorker, error) { - bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, "cleanup_idx_rate", false, false) + bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, metrics.LblCleanupIdxRate, false, false) if err != nil { return nil, err } diff --git a/pkg/ddl/modify_column.go b/pkg/ddl/modify_column.go index 8de8adced1733..09a5baf485fd0 100644 --- a/pkg/ddl/modify_column.go +++ b/pkg/ddl/modify_column.go @@ -502,7 +502,7 @@ func (w *worker) doModifyColumnTypeWithData( // Make sure job args change after `updateVersionAndTableInfoWithCheck`, otherwise, the job args will // be updated in `updateDDLJob` even if it meets an error in `updateVersionAndTableInfoWithCheck`. job.SchemaState = model.StateDeleteOnly - metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn, job.SchemaName, tblInfo.Name.String()).Set(0) + metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn, job.SchemaName, tblInfo.Name.String(), args.OldColumnName.O).Set(0) args.ChangingColumn = changingCol args.ChangingIdxs = changingIdxs failpoint.InjectCall("modifyColumnTypeWithData", job, args) diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index 580acf932b7a4..46e36efbb5f60 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -3334,7 +3334,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver } // Assume we cannot have more than MaxUint64 rows, set the progress to 1/10 of that. - metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.1 / float64(math.MaxUint64)) + metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String(), "").Set(0.1 / float64(math.MaxUint64)) job.SchemaState = model.StateDeleteOnly tblInfo.Partition.DDLState = job.SchemaState ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true) @@ -3398,7 +3398,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver } } tblInfo.Partition.DDLState = model.StateWriteOnly - metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.2 / float64(math.MaxUint64)) + metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String(), "").Set(0.2 / float64(math.MaxUint64)) failpoint.Inject("reorgPartRollback2", func(val failpoint.Value) { if val.(bool) { err = errors.New("Injected error by reorgPartRollback2") @@ -3419,7 +3419,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver } job.SchemaState = model.StateWriteReorganization tblInfo.Partition.DDLState = job.SchemaState - metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.3 / float64(math.MaxUint64)) + metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String(), "").Set(0.3 / float64(math.MaxUint64)) ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true) case model.StateWriteReorganization: physicalTableIDs := getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions) @@ -3813,7 +3813,7 @@ type reorgPartitionWorker struct { } func newReorgPartitionWorker(i int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*reorgPartitionWorker, error) { - bCtx, err := newBackfillCtx(i, reorgInfo, reorgInfo.SchemaName, t, jc, "reorg_partition_rate", false, false) + bCtx, err := newBackfillCtx(i, reorgInfo, reorgInfo.SchemaName, t, jc, metrics.LblReorgPartitionRate, false, false) if err != nil { return nil, err } diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index ed7e28852c52d..2ed1ac5811efe 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -511,12 +511,26 @@ func updateBackfillProgress(w *worker, reorgInfo *reorgInfo, tblInfo *model.Tabl } else { label = metrics.LblAddIndex } - metrics.GetBackfillProgressByLabel(label, reorgInfo.SchemaName, tblInfo.Name.String()).Set(progress * 100) + idxNames := "" + args, err := model.GetModifyIndexArgs(reorgInfo.Job) + if err != nil { + logutil.DDLLogger().Error("Fail to get ModifyIndexArgs", zap.Error(err)) + } else { + idxNames = getIdxNamesFromArgs(args) + } + metrics.GetBackfillProgressByLabel(label, reorgInfo.SchemaName, tblInfo.Name.String(), idxNames).Set(progress * 100) case model.ActionModifyColumn: - metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn, reorgInfo.SchemaName, tblInfo.Name.String()).Set(progress * 100) + colName := "" + args, err := model.GetModifyColumnArgs(reorgInfo.Job) + if err != nil { + logutil.DDLLogger().Error("Fail to get ModifyColumnArgs", zap.Error(err)) + } else { + colName = args.OldColumnName.O + } + metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn, reorgInfo.SchemaName, tblInfo.Name.String(), colName).Set(progress * 100) case model.ActionReorganizePartition, model.ActionRemovePartitioning, model.ActionAlterTablePartitioning: - metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, reorgInfo.SchemaName, tblInfo.Name.String()).Set(progress * 100) + metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, reorgInfo.SchemaName, tblInfo.Name.String(), "").Set(progress * 100) } } diff --git a/pkg/metrics/ddl.go b/pkg/metrics/ddl.go index 4c721a5cda123..834d25db893fe 100644 --- a/pkg/metrics/ddl.go +++ b/pkg/metrics/ddl.go @@ -212,26 +212,47 @@ func InitDDLMetrics() { const ( LblAction = "action" - LblAddIndex = "add_index" - LblAddIndexMerge = "add_index_merge_tmp" - LblModifyColumn = "modify_column" - + // Used by BackfillProgressGauge + LblAddIndex = "add_index" + LblAddIndexMerge = "add_index_merge_tmp" + LblModifyColumn = "modify_column" LblReorgPartition = "reorganize_partition" + + // Used by BackfillTotalCounter + LblAddIdxRate = "add_idx_rate" + LblMergeTmpIdxRate = "merge_tmp_idx_rate" + LblCleanupIdxRate = "cleanup_idx_rate" + LblUpdateColRate = "update_col_rate" + LblReorgPartitionRate = "reorg_partition_rate" ) -// GenerateReorgLabel returns the label with schema name and table name. -func GenerateReorgLabel(label string, schemaName string, tableName string) string { +// generateReorgLabel returns the label with schema name, table name and optional column/index names. +// Multiple columns/indexes can be concatenated with "+". +func generateReorgLabel(label, schemaName, tableName, colOrIdxNames string) string { var stringBuilder strings.Builder - stringBuilder.Grow(len(label) + len(schemaName) + len(tableName) + 2) + if len(colOrIdxNames) == 0 { + stringBuilder.Grow(len(label) + len(schemaName) + len(tableName) + 2) + } else { + stringBuilder.Grow(len(label) + len(schemaName) + len(tableName) + len(colOrIdxNames) + 3) + } stringBuilder.WriteString(label) - stringBuilder.WriteString("_") + stringBuilder.WriteString("-") stringBuilder.WriteString(schemaName) - stringBuilder.WriteString("_") + stringBuilder.WriteString("-") stringBuilder.WriteString(tableName) + if len(colOrIdxNames) > 0 { + stringBuilder.WriteString("-") + stringBuilder.WriteString(colOrIdxNames) + } return stringBuilder.String() } +// GetBackfillTotalByLabel returns the Counter showing the speed of backfilling for the given type label. +func GetBackfillTotalByLabel(label, schemaName, tableName, optionalColOrIdxName string) prometheus.Counter { + return BackfillTotalCounter.WithLabelValues(generateReorgLabel(label, schemaName, tableName, optionalColOrIdxName)) +} + // GetBackfillProgressByLabel returns the Gauge showing the percentage progress for the given type label. -func GetBackfillProgressByLabel(label string, schemaName string, tableName string) prometheus.Gauge { - return BackfillProgressGauge.WithLabelValues(GenerateReorgLabel(label, schemaName, tableName)) +func GetBackfillProgressByLabel(label, schemaName, tableName, optionalColOrIdxName string) prometheus.Gauge { + return BackfillProgressGauge.WithLabelValues(generateReorgLabel(label, schemaName, tableName, optionalColOrIdxName)) }