Skip to content

Commit

Permalink
[improvement](statistics)Support auto analyze columns that haven't be…
Browse files Browse the repository at this point in the history
…en analyzed for a long time. (#42399)

Support auto analyze columns that haven't been analyzed for a long time.
Add a very low priority job queue for auto analyze to process this kind
of columns.

The purpose of this change is to make sure all tables could be auto
analyzed within a certain time. In the earlier Doris versions, users
often encounter this kind of issues:
User load some new data to a large table everyday, but the change rate
(percentage of new data) is very low, because there is a large size of
old data. In this case, auto analyze for this table will not be
triggered for a very long time, because the default trigger threshold of
auto analyze is 40% (more than 40% of the data in a table is changed
since last analyze). This will probably cause a bad plan because
min/max/ndv statistics are outdated.
  • Loading branch information
Jibing-Li authored and Your Name committed Dec 11, 2024
1 parent 178ea2a commit a2602bb
Show file tree
Hide file tree
Showing 18 changed files with 472 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2903,7 +2903,7 @@ public class Config extends ConfigBase {
"Columns that have not been collected within the specified interval will trigger automatic analyze. "
+ "0 means not trigger."
})
public static long auto_analyze_interval_seconds = 0;
public static long auto_analyze_interval_seconds = 86400; // 24 hours.

//==========================================================================
// begin of cloud config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* [TABLE]
* [
* WHERE
* [PRIORITY = ["HIGH"|"MID"|"LOW"]]
* [PRIORITY = ["HIGH"|"MID"|"LOW"|"VERY_LOW"]]
* ]
*/
public class ShowAutoAnalyzeJobsStmt extends ShowStmt implements NotFallbackInParser {
Expand Down Expand Up @@ -175,7 +175,7 @@ private void analyzeSubPredicate(Expr subExpr) throws AnalysisException {

if (!valid) {
throw new AnalysisException("Where clause should looks like: "
+ "PRIORITY = \"HIGH|MID|LOW\"");
+ "PRIORITY = \"HIGH|MID|LOW|VERY_LOW\"");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class ShowColumnStatsStmt extends ShowStmt implements NotFallbackInParser
.add("updated_time")
.add("update_rows")
.add("last_analyze_row_count")
.add("last_analyze_version")
.build();

private static final ImmutableList<String> PARTITION_COLUMN_TITLE_NAMES =
Expand Down Expand Up @@ -185,6 +186,7 @@ public ShowResultSet constructResultSet(List<Pair<Pair<String, String>, ColumnSt
row.add(String.valueOf(p.second.updatedTime));
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.updatedRows));
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.rowCount));
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.tableVersion));
result.add(row);
});
return new ShowResultSet(getMetaData(), result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class ShowTableStatsStmt extends ShowStmt implements NotFallbackInParser
.add("new_partition")
.add("user_inject")
.add("enable_auto_analyze")
.add("last_analyze_time")
.build();

private static final ImmutableList<String> PARTITION_TITLE_NAMES =
Expand Down Expand Up @@ -230,6 +231,7 @@ public ShowResultSet constructTableResultSet(TableStatsMeta tableStatistic, Tabl
row.add("");
row.add("");
row.add(String.valueOf(table.autoAnalyzeEnabled()));
row.add("");
result.add(row);
return new ShowResultSet(getMetaData(), result);
}
Expand All @@ -242,13 +244,16 @@ public ShowResultSet constructTableResultSet(TableStatsMeta tableStatistic, Tabl
LocalDateTime dateTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.updatedTime),
java.time.ZoneId.systemDefault());
String formattedDateTime = dateTime.format(formatter);
row.add(formattedDateTime);
LocalDateTime lastAnalyzeTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.lastAnalyzeTime),
java.time.ZoneId.systemDefault());
row.add(dateTime.format(formatter));
row.add(tableStatistic.analyzeColumns().toString());
row.add(tableStatistic.jobType.toString());
row.add(String.valueOf(tableStatistic.partitionChanged.get()));
row.add(String.valueOf(tableStatistic.userInjected));
row.add(table == null ? "N/A" : String.valueOf(table.autoAnalyzeEnabled()));
row.add(lastAnalyzeTime.format(formatter));
result.add(row);
return new ShowResultSet(getMetaData(), result);
}
Expand Down
11 changes: 0 additions & 11 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -874,17 +874,6 @@ public Set<Column> getSchemaAllIndexes(boolean full) {
return columns;
}

public List<Column> getMvColumns(boolean full) {
List<Column> columns = Lists.newArrayList();
for (Long indexId : indexIdToMeta.keySet()) {
if (indexId == baseIndexId) {
continue;
}
columns.addAll(getSchemaByIndexId(indexId, full));
}
return columns;
}

public List<Column> getBaseSchemaKeyColumns() {
return getKeyColumnsByIndexId(baseIndexId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ public enum ScheduleType {
@SerializedName("updateRows")
public final long updateRows;

@SerializedName("tv")
public final long tableVersion;

public final Map<Long, Long> partitionUpdateRows = new ConcurrentHashMap<>();

@SerializedName("tblUpdateTime")
Expand All @@ -206,8 +209,8 @@ public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId,
long lastExecTimeInMs, long timeCostInMs, AnalysisState state, ScheduleType scheduleType,
boolean partitionOnly, boolean samplingPartition,
boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull,
boolean usingSqlForExternalTable, long tblUpdateTime, long rowCount, boolean userInject,
long updateRows, JobPriority priority, Map<Long, Long> partitionUpdateRows, boolean enablePartition) {
boolean usingSqlForExternalTable, long tblUpdateTime, long rowCount, boolean userInject, long updateRows,
long tableVersion, JobPriority priority, Map<Long, Long> partitionUpdateRows, boolean enablePartition) {
this.jobId = jobId;
this.taskId = taskId;
this.taskIds = taskIds;
Expand Down Expand Up @@ -244,6 +247,7 @@ public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId,
this.rowCount = rowCount;
this.userInject = userInject;
this.updateRows = updateRows;
this.tableVersion = tableVersion;
this.priority = priority;
if (partitionUpdateRows != null) {
this.partitionUpdateRows.putAll(partitionUpdateRows);
Expand Down Expand Up @@ -292,6 +296,7 @@ public String toString() {
sj.add("rowCount: " + rowCount);
sj.add("userInject: " + userInject);
sj.add("updateRows: " + updateRows);
sj.add("tableVersion: " + tableVersion);
sj.add("priority: " + priority.name());
sj.add("enablePartition: " + enablePartition);
return sj.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class AnalysisInfoBuilder {
private long rowCount;
private boolean userInject = false;
private long updateRows;
private long tableVersion;
private JobPriority priority;
private Map<Long, Long> partitionUpdateRows;
private boolean enablePartition;
Expand Down Expand Up @@ -104,6 +105,7 @@ public AnalysisInfoBuilder(AnalysisInfo info) {
rowCount = info.rowCount;
userInject = info.userInject;
updateRows = info.updateRows;
tableVersion = info.tableVersion;
priority = info.priority;
partitionUpdateRows = info.partitionUpdateRows;
enablePartition = info.enablePartition;
Expand Down Expand Up @@ -274,6 +276,11 @@ public AnalysisInfoBuilder setUpdateRows(long updateRows) {
return this;
}

public AnalysisInfoBuilder setTableVersion(long tableVersion) {
this.tableVersion = tableVersion;
return this;
}

public AnalysisInfoBuilder setPriority(JobPriority priority) {
this.priority = priority;
return this;
Expand All @@ -295,7 +302,7 @@ public AnalysisInfo build() {
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType,
partitionOnly, samplingPartition, isAllPartition, partitionCount,
cronExpression, forceFull, usingSqlForExternalTable, tblUpdateTime, rowCount, userInject, updateRows,
priority, partitionUpdateRows, enablePartition);
tableVersion, priority, partitionUpdateRows, enablePartition);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public class AnalysisManager implements Writable {
public final Map<TableName, Set<Pair<String, String>>> highPriorityJobs = new LinkedHashMap<>();
public final Map<TableName, Set<Pair<String, String>>> midPriorityJobs = new LinkedHashMap<>();
public final Map<TableName, Set<Pair<String, String>>> lowPriorityJobs = new LinkedHashMap<>();
public final Map<TableName, Set<Pair<String, String>>> veryLowPriorityJobs = new LinkedHashMap<>();

// Tracking running manually submitted async tasks, keep in mem only
protected final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> analysisJobIdToTaskMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -381,14 +382,15 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) {
}
infoBuilder.setColName(stringJoiner.toString());
infoBuilder.setTaskIds(Lists.newArrayList());
infoBuilder.setTblUpdateTime(System.currentTimeMillis());
infoBuilder.setTblUpdateTime(table.getUpdateTime());
// Empty table row count is 0. Call fetchRowCount() when getRowCount() returns <= 0,
// because getRowCount may return <= 0 if cached is not loaded. This is mainly for external table.
long rowCount = StatisticsUtil.isEmptyTable(table, analysisMethod) ? 0 :
(table.getRowCount() <= 0 ? table.fetchRowCount() : table.getRowCount());
infoBuilder.setRowCount(rowCount);
TableStatsMeta tableStatsStatus = findTableStatsStatus(table.getId());
infoBuilder.setUpdateRows(tableStatsStatus == null ? 0 : tableStatsStatus.updatedRows.get());
infoBuilder.setTableVersion(table instanceof OlapTable ? ((OlapTable) table).getVisibleVersion() : 0);
infoBuilder.setPriority(JobPriority.MANUAL);
infoBuilder.setPartitionUpdateRows(tableStatsStatus == null ? null : tableStatsStatus.partitionUpdateRows);
infoBuilder.setEnablePartition(StatisticsUtil.enablePartitionAnalyze());
Expand Down Expand Up @@ -547,12 +549,15 @@ public List<AutoAnalysisPendingJob> showAutoPendingJobs(ShowAutoAnalyzeJobsStmt
result.addAll(getPendingJobs(highPriorityJobs, JobPriority.HIGH, tblName));
result.addAll(getPendingJobs(midPriorityJobs, JobPriority.MID, tblName));
result.addAll(getPendingJobs(lowPriorityJobs, JobPriority.LOW, tblName));
result.addAll(getPendingJobs(veryLowPriorityJobs, JobPriority.VERY_LOW, tblName));
} else if (priority.equals(JobPriority.HIGH.name())) {
result.addAll(getPendingJobs(highPriorityJobs, JobPriority.HIGH, tblName));
} else if (priority.equals(JobPriority.MID.name())) {
result.addAll(getPendingJobs(midPriorityJobs, JobPriority.MID, tblName));
} else if (priority.equals(JobPriority.LOW.name())) {
result.addAll(getPendingJobs(lowPriorityJobs, JobPriority.LOW, tblName));
} else if (priority.equals(JobPriority.VERY_LOW.name())) {
result.addAll(getPendingJobs(veryLowPriorityJobs, JobPriority.VERY_LOW, tblName));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,23 @@ public class ColStatsMeta {
@SerializedName("rowCount")
public long rowCount;

@SerializedName("tv")
public long tableVersion;

@SerializedName("pur")
public ConcurrentMap<Long, Long> partitionUpdateRows = new ConcurrentHashMap<>();

public ColStatsMeta(long updatedTime, AnalysisMethod analysisMethod, AnalysisType analysisType, JobType jobType,
long queriedTimes, long rowCount, long updatedRows, Map<Long, Long> partitionUpdateRows) {
public ColStatsMeta(long updatedTime, AnalysisMethod analysisMethod, AnalysisType analysisType,
JobType jobType, long queriedTimes, long rowCount, long updatedRows,
long tableVersion, Map<Long, Long> partitionUpdateRows) {
this.updatedTime = updatedTime;
this.analysisMethod = analysisMethod;
this.analysisType = analysisType;
this.jobType = jobType;
this.queriedTimes.addAndGet(queriedTimes);
this.updatedRows = updatedRows;
this.rowCount = rowCount;
this.tableVersion = tableVersion;
if (partitionUpdateRows != null) {
this.partitionUpdateRows.putAll(partitionUpdateRows);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ public enum JobPriority {
HIGH,
MID,
LOW,
VERY_LOW,
MANUAL;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.JobType;
Expand All @@ -37,7 +36,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -92,10 +90,11 @@ protected void runAfterCatalogReady() {
}

protected void collect() {
while (canCollect()) {
while (StatisticsUtil.canCollect()) {
Pair<Entry<TableName, Set<Pair<String, String>>>, JobPriority> job = getJob();
if (job == null) {
// No more job to process, break and sleep.
LOG.info("No auto analyze jobs to process.");
break;
}
try {
Expand All @@ -112,11 +111,6 @@ protected void collect() {
}
}

protected boolean canCollect() {
return StatisticsUtil.enableAutoAnalyze()
&& StatisticsUtil.inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()));
}

protected Pair<Entry<TableName, Set<Pair<String, String>>>, JobPriority> getJob() {
AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
Optional<Entry<TableName, Set<Pair<String, String>>>> job = fetchJobFromMap(manager.highPriorityJobs);
Expand All @@ -128,7 +122,11 @@ protected Pair<Entry<TableName, Set<Pair<String, String>>>, JobPriority> getJob(
return Pair.of(job.get(), JobPriority.MID);
}
job = fetchJobFromMap(manager.lowPriorityJobs);
return job.map(entry -> Pair.of(entry, JobPriority.LOW)).orElse(null);
if (job.isPresent()) {
return Pair.of(job.get(), JobPriority.LOW);
}
job = fetchJobFromMap(manager.veryLowPriorityJobs);
return job.map(tableNameSetEntry -> Pair.of(tableNameSetEntry, JobPriority.VERY_LOW)).orElse(null);
}

protected Optional<Map.Entry<TableName, Set<Pair<String, String>>>> fetchJobFromMap(
Expand All @@ -142,9 +140,13 @@ protected Optional<Map.Entry<TableName, Set<Pair<String, String>>>> fetchJobFrom

protected void processOneJob(TableIf table, Set<Pair<String, String>> columns,
JobPriority priority) throws DdlException {
// appendMvColumn(table, columns);
appendAllColumns(table, columns);
columns = columns.stream().filter(c -> StatisticsUtil.needAnalyzeColumn(table, c)).collect(Collectors.toSet());
columns = columns.stream().filter(
c -> StatisticsUtil.needAnalyzeColumn(table, c) || StatisticsUtil.isLongTimeColumn(table, c))
.collect(Collectors.toSet());
if (columns.isEmpty()) {
return;
}
AnalysisInfo analyzeJob = createAnalyzeJobForTbl(table, columns, priority);
if (analyzeJob == null) {
return;
Expand Down Expand Up @@ -178,15 +180,6 @@ protected void appendAllColumns(TableIf table, Set<Pair<String, String>> columns
}
}

protected void appendMvColumn(TableIf table, Set<String> columns) {
if (!(table instanceof OlapTable)) {
return;
}
OlapTable olapTable = (OlapTable) table;
Set<String> mvColumns = olapTable.getMvColumns(false).stream().map(Column::getName).collect(Collectors.toSet());
columns.addAll(mvColumns);
}

protected boolean supportAutoAnalyze(TableIf tableIf) {
if (tableIf == null) {
return false;
Expand Down Expand Up @@ -248,9 +241,10 @@ protected AnalysisInfo createAnalyzeJobForTbl(
.setTaskIds(new ArrayList<>())
.setLastExecTimeInMs(System.currentTimeMillis())
.setJobType(JobType.SYSTEM)
.setTblUpdateTime(System.currentTimeMillis())
.setTblUpdateTime(table.getUpdateTime())
.setRowCount(rowCount)
.setUpdateRows(tableStatsStatus == null ? 0 : tableStatsStatus.updatedRows.get())
.setTableVersion(table instanceof OlapTable ? ((OlapTable) table).getVisibleVersion() : 0)
.setPriority(priority)
.setPartitionUpdateRows(tableStatsStatus == null ? null : tableStatsStatus.partitionUpdateRows)
.setEnablePartition(StatisticsUtil.enablePartitionAnalyze())
Expand All @@ -275,4 +269,8 @@ protected void executeSystemAnalysisJob(AnalysisInfo jobInfo)
future.get();
}
}

public boolean isReady() {
return waited;
}
}
Loading

0 comments on commit a2602bb

Please sign in to comment.