Skip to content

Commit

Permalink
statistics: Do not create pseudo statistics for the auto-analysis che…
Browse files Browse the repository at this point in the history
…ck process (#51479)

ref #50132
  • Loading branch information
Rustin170506 authored Mar 6, 2024
1 parent ad28e23 commit 9b255d5
Show file tree
Hide file tree
Showing 6 changed files with 625 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ go_test(
"queue_test.go",
],
flaky = True,
shard_count = 17,
shard_count = 22,
deps = [
":priorityqueue",
"//pkg/parser/model",
Expand Down
176 changes: 161 additions & 15 deletions pkg/statistics/handle/autoanalyze/priorityqueue/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ import (
type analyzeType string

const (
analyzeTable analyzeType = "analyzeTable"
analyzeIndex analyzeType = "analyzeIndex"
analyzePartition analyzeType = "analyzePartition"
analyzePartitionIndex analyzeType = "analyzePartitionIndex"
analyzeTable analyzeType = "analyzeTable"
analyzeIndex analyzeType = "analyzeIndex"
analyzeStaticPartition analyzeType = "analyzeStaticPartition"
analyzeStaticPartitionIndex analyzeType = "analyzeStaticPartitionIndex"
analyzePartition analyzeType = "analyzePartition"
analyzePartitionIndex analyzeType = "analyzePartitionIndex"
)

// defaultFailedAnalysisWaitTime is the default wait time for the next analysis after a failed analysis.
Expand All @@ -50,22 +52,103 @@ type TableAnalysisJob struct {
// and we don't want to analyze the same partition multiple times.
// For example, the user may analyze some partitions manually, and we don't want to analyze them again.
PartitionIndexes map[string][]string
TableSchema string
TableName string
// Only set when table's indexes need to be analyzed.

TableSchema string
// For physical or global tables.
TableName string
// For static pruned tables.
StaticPartitionName string
// Only set when table or static partition's indexes need to be analyzed.
// This is only for newly added indexes.
Indexes []string

// Only set when table's partitions need to be analyzed.
// This will analyze all indexes and columns of the specified partitions.
Partitions []string
TableID int64
Partitions []string
// The global table ID.
TableID int64
// The static partition ID.
StaticPartitionID int64

TableStatsVer int
ChangePercentage float64
TableSize float64
LastAnalysisDuration time.Duration
Weight float64
}

// NewStaticPartitionTableAnalysisJob creates a new TableAnalysisJob for analyzing the static partition.
func NewStaticPartitionTableAnalysisJob(
schema, globalTableName string,
globalTableID int64,
partitionName string,
partitionID int64,
indexes []string,
tableStatsVer int,
changePercentage float64,
tableSize float64,
lastAnalysisDuration time.Duration,
) *TableAnalysisJob {
return &TableAnalysisJob{
TableSchema: schema,
TableName: globalTableName,
TableID: globalTableID,
StaticPartitionName: partitionName,
StaticPartitionID: partitionID,
Indexes: indexes,
TableStatsVer: tableStatsVer,
ChangePercentage: changePercentage,
TableSize: tableSize,
LastAnalysisDuration: lastAnalysisDuration,
}
}

// NewNonPartitionedTableAnalysisJob creates a new TableAnalysisJob for analyzing the table or partition.
func NewNonPartitionedTableAnalysisJob(
schema, tableName string,
tableID int64,
indexes []string,
tableStatsVer int,
changePercentage float64,
tableSize float64,
lastAnalysisDuration time.Duration,
) *TableAnalysisJob {
return &TableAnalysisJob{
TableSchema: schema,
TableName: tableName,
TableID: tableID,
Indexes: indexes,
TableStatsVer: tableStatsVer,
ChangePercentage: changePercentage,
TableSize: tableSize,
LastAnalysisDuration: lastAnalysisDuration,
}
}

// NewDynamicPartitionTableAnalysisJob creates a new TableAnalysisJob for analyzing dynamic partitioned table.
func NewDynamicPartitionTableAnalysisJob(
schema, tableName string,
tableID int64,
partitions []string,
partitionIndexes map[string][]string,
tableStatsVer int,
changePercentage float64,
tableSize float64,
lastAnalysisDuration time.Duration,
) *TableAnalysisJob {
return &TableAnalysisJob{
TableSchema: schema,
TableName: tableName,
TableID: tableID,
Partitions: partitions,
PartitionIndexes: partitionIndexes,
TableStatsVer: tableStatsVer,
ChangePercentage: changePercentage,
TableSize: tableSize,
LastAnalysisDuration: lastAnalysisDuration,
}
}

// HasNewlyAddedIndex checks whether the table has newly added index.
func (j *TableAnalysisJob) HasNewlyAddedIndex() bool {
return len(j.PartitionIndexes) > 0 || len(j.Indexes) > 0
Expand All @@ -79,9 +162,10 @@ func (j *TableAnalysisJob) IsValidToAnalyze(
sctx sessionctx.Context,
) (bool, string) {
// No need to analyze this table.
// TODO: Usually, we should not put this kind of table into the queue.
if j.Weight == 0 {
return false, "weight is 0"
// Usually, we should not put this kind of table into the queue.
// This is just a double check.
if j.Weight <= 0 {
return false, fmt.Sprintf("weight is less than or equal to 0: %.4f", j.Weight)
}

// Check whether the table or partition is valid to analyze.
Expand All @@ -97,6 +181,17 @@ func (j *TableAnalysisJob) IsValidToAnalyze(
); !valid {
return false, failReason
}
} else if j.StaticPartitionName != "" {
// For static partition table we only need to check the specified static partition.
partitionNames := []string{j.StaticPartitionName}
if valid, failReason := isValidToAnalyze(
sctx,
j.TableSchema,
j.TableName,
partitionNames...,
); !valid {
return false, failReason
}
} else {
if valid, failReason := isValidToAnalyze(
sctx,
Expand Down Expand Up @@ -219,7 +314,11 @@ func (j *TableAnalysisJob) Analyze(
case analyzePartition:
j.analyzePartitions(sctx, statsHandle, sysProcTracker)
case analyzePartitionIndex:
j.AnalyzePartitionIndexes(sctx, statsHandle, sysProcTracker)
j.analyzePartitionIndexes(sctx, statsHandle, sysProcTracker)
case analyzeStaticPartition:
j.analyzeStaticPartition(sctx, statsHandle, sysProcTracker)
case analyzeStaticPartitionIndex:
j.analyzeStaticPartitionIndexes(sctx, statsHandle, sysProcTracker)
}
}

Expand All @@ -230,7 +329,12 @@ func (j *TableAnalysisJob) getAnalyzeType() analyzeType {
case len(j.Partitions) > 0:
return analyzePartition
case len(j.Indexes) > 0:
if j.StaticPartitionName != "" {
return analyzeStaticPartitionIndex
}
return analyzeIndex
case j.StaticPartitionName != "":
return analyzeStaticPartition
default:
return analyzeTable
}
Expand All @@ -245,6 +349,15 @@ func (j *TableAnalysisJob) analyzeTable(
exec.AutoAnalyze(sctx, statsHandle, sysProcTracker, j.TableStatsVer, sql, params...)
}

func (j *TableAnalysisJob) analyzeStaticPartition(
sctx sessionctx.Context,
statsHandle statstypes.StatsHandle,
sysProcTracker sessionctx.SysProcTracker,
) {
sql, params := j.GenSQLForAnalyzeStaticPartition()
exec.AutoAnalyze(sctx, statsHandle, sysProcTracker, j.TableStatsVer, sql, params...)
}

func (j *TableAnalysisJob) analyzeIndexes(
sctx sessionctx.Context,
statsHandle statstypes.StatsHandle,
Expand All @@ -256,6 +369,17 @@ func (j *TableAnalysisJob) analyzeIndexes(
}
}

func (j *TableAnalysisJob) analyzeStaticPartitionIndexes(
sctx sessionctx.Context,
statsHandle statstypes.StatsHandle,
sysProcTracker sessionctx.SysProcTracker,
) {
for _, index := range j.Indexes {
sql, params := j.GenSQLForAnalyzeStaticPartitionIndex(index)
exec.AutoAnalyze(sctx, statsHandle, sysProcTracker, j.TableStatsVer, sql, params...)
}
}

// analyzePartitions performs analysis on the specified partitions.
// This function uses a batch mode for efficiency. After analyzing the partitions,
// it's necessary to merge their statistics. By analyzing them in batches,
Expand Down Expand Up @@ -283,8 +407,8 @@ func (j *TableAnalysisJob) analyzePartitions(
}
}

// AnalyzePartitionIndexes performs analysis on the specified partition indexes.
func (j *TableAnalysisJob) AnalyzePartitionIndexes(
// analyzePartitionIndexes performs analysis on the specified partition indexes.
func (j *TableAnalysisJob) analyzePartitionIndexes(
sctx sessionctx.Context,
statsHandle statstypes.StatsHandle,
sysProcTracker sessionctx.SysProcTracker,
Expand Down Expand Up @@ -332,6 +456,14 @@ func (j *TableAnalysisJob) GenSQLForAnalyzeTable() (string, []any) {
return sql, params
}

// GenSQLForAnalyzeStaticPartition generates the SQL for analyzing the specified static partition.
func (j *TableAnalysisJob) GenSQLForAnalyzeStaticPartition() (string, []any) {
sql := "analyze table %n.%n partition %n"
params := []any{j.TableSchema, j.TableName, j.StaticPartitionName}

return sql, params
}

// GenSQLForAnalyzeIndex generates the SQL for analyzing the specified index.
func (j *TableAnalysisJob) GenSQLForAnalyzeIndex(index string) (string, []any) {
sql := "analyze table %n.%n index %n"
Expand All @@ -340,6 +472,14 @@ func (j *TableAnalysisJob) GenSQLForAnalyzeIndex(index string) (string, []any) {
return sql, params
}

// GenSQLForAnalyzeStaticPartitionIndex generates the SQL for analyzing the specified static partition index.
func (j *TableAnalysisJob) GenSQLForAnalyzeStaticPartitionIndex(index string) (string, []any) {
sql := "analyze table %n.%n partition %n index %n"
params := []any{j.TableSchema, j.TableName, j.StaticPartitionName, index}

return sql, params
}

func (j *TableAnalysisJob) String() string {
analyzeType := j.getAnalyzeType()
switch analyzeType {
Expand All @@ -355,6 +495,12 @@ func (j *TableAnalysisJob) String() string {
case analyzePartitionIndex:
return fmt.Sprintf(`TableAnalysisJob: {AnalyzeType: partitionIndex, PartitionIndexes: %v, Schema: %s, Table: %s, TableID: %d, TableStatsVer: %d, ChangePercentage: %.2f, Weight: %.4f}`,
j.PartitionIndexes, j.TableSchema, j.TableName, j.TableID, j.TableStatsVer, j.ChangePercentage, j.Weight)
case analyzeStaticPartition:
return fmt.Sprintf(`TableAnalysisJob: {AnalyzeType: staticPartition, Schema: %s, Table: %s, TableID: %d, StaticPartition: %s, StaticPartitionID: %d, TableStatsVer: %d, ChangePercentage: %.2f, Weight: %.4f}`,
j.TableSchema, j.TableName, j.TableID, j.StaticPartitionName, j.StaticPartitionID, j.TableStatsVer, j.ChangePercentage, j.Weight)
case analyzeStaticPartitionIndex:
return fmt.Sprintf(`TableAnalysisJob: {AnalyzeType: staticPartitionIndex, Indexes: %s, Schema: %s, Table: %s, TableID: %d, StaticPartition: %s, StaticPartitionID: %d, TableStatsVer: %d, ChangePercentage: %.2f, Weight: %.4f}`,
strings.Join(j.Indexes, ", "), j.TableSchema, j.TableName, j.TableID, j.StaticPartitionName, j.StaticPartitionID, j.TableStatsVer, j.ChangePercentage, j.Weight)
default:
return "TableAnalysisJob: {AnalyzeType: unknown}"
}
Expand Down
Loading

0 comments on commit 9b255d5

Please sign in to comment.