Skip to content

Commit

Permalink
Support auto analyze columns that haven't been analyzed for a long time.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jibing-Li committed Dec 11, 2024
1 parent d0737c4 commit 1ce4940
Show file tree
Hide file tree
Showing 23 changed files with 692 additions and 720 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2764,6 +2764,13 @@ public class Config extends ConfigBase {
public static boolean enable_proxy_protocol = false;
public static int profile_async_collect_expire_time_secs = 5;

@ConfField(mutable = true, description = {
"内表自动收集时间间隔,当某一列上次收集时间距离当前时间大于该值,则会触发一次新的收集,0表示不会触发。",
"Columns that have not been collected within the specified interval will trigger automatic analyze. "
+ "0 means not trigger."
})
public static long auto_analyze_interval_seconds = 86400;


//==========================================================================
// begin of cloud config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class ShowAnalyzeStmt extends ShowStmt {
.add("schedule_type")
.add("start_time")
.add("end_time")
.add("priority")
.build();

private long jobId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class ShowColumnStatsStmt extends ShowStmt {
.add("trigger")
.add("query_times")
.add("updated_time")
.add("last_analyze_version")
.build();

private final TableName tableName;
Expand Down Expand Up @@ -162,6 +163,7 @@ public ShowResultSet constructResultSet(List<Pair<Pair<String, String>, ColumnSt
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.jobType));
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.queriedTimes));
row.add(String.valueOf(p.second.updatedTime));
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.tableVersion));
result.add(row);
});
return new ShowResultSet(getMetaData(), result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class ShowTableStatsStmt extends ShowStmt {
.add("trigger")
.add("new_partition")
.add("user_inject")
.add("last_analyze_time")
.build();

private static final ImmutableList<String> INDEX_TITLE_NAMES =
Expand Down Expand Up @@ -192,6 +193,7 @@ public ShowResultSet constructTableResultSet(TableStatsMeta tableStatistic, Tabl
row.add("");
row.add("");
row.add("");
row.add("");
result.add(row);
return new ShowResultSet(getMetaData(), result);
}
Expand All @@ -201,15 +203,18 @@ public ShowResultSet constructTableResultSet(TableStatsMeta tableStatistic, Tabl
row.add(String.valueOf(tableStatistic.updatedRows));
row.add(String.valueOf(tableStatistic.queriedTimes.get()));
row.add(String.valueOf(tableStatistic.rowCount));
LocalDateTime dateTime =
LocalDateTime tableUpdateTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.updatedTime),
java.time.ZoneId.systemDefault());
String formattedDateTime = dateTime.format(formatter);
row.add(formattedDateTime);
LocalDateTime lastAnalyzeTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.lastAnalyzeTime),
java.time.ZoneId.systemDefault());
row.add(tableUpdateTime.format(formatter));
row.add(tableStatistic.analyzeColumns().toString());
row.add(tableStatistic.jobType.toString());
row.add(String.valueOf(tableStatistic.newPartitionLoaded.get()));
row.add(String.valueOf(tableStatistic.userInjected));
row.add(lastAnalyzeTime.format(formatter));
result.add(row);
return new ShowResultSet(getMetaData(), result);
}
Expand Down
22 changes: 16 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@
import org.apache.doris.statistics.StatisticsAutoCollector;
import org.apache.doris.statistics.StatisticsCache;
import org.apache.doris.statistics.StatisticsCleaner;
import org.apache.doris.statistics.StatisticsJobAppender;
import org.apache.doris.statistics.query.QueryStats;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
Expand Down Expand Up @@ -540,6 +541,7 @@ public class Env {
private final LoadManagerAdapter loadManagerAdapter;

private StatisticsAutoCollector statisticsAutoCollector;
private StatisticsJobAppender statisticsJobAppender;

private HiveTransactionMgr hiveTransactionMgr;

Expand Down Expand Up @@ -780,6 +782,7 @@ public Env(boolean isCheckpointCatalog) {
this.analysisManager = new AnalysisManager();
this.statisticsCleaner = new StatisticsCleaner();
this.statisticsAutoCollector = new StatisticsAutoCollector();
this.statisticsJobAppender = new StatisticsJobAppender("StatisticsJobAppender");
this.globalFunctionMgr = new GlobalFunctionMgr();
this.workloadGroupMgr = new WorkloadGroupMgr();
this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr();
Expand Down Expand Up @@ -1078,12 +1081,6 @@ public void initialize(String[] args) throws Exception {
// If not using bdb, we need to notify the FE type transfer manually.
notifyNewFETypeTransfer(FrontendNodeType.MASTER);
}
if (statisticsCleaner != null) {
statisticsCleaner.start();
}
if (statisticsAutoCollector != null) {
statisticsAutoCollector.start();
}

queryCancelWorker.start();
}
Expand Down Expand Up @@ -1623,6 +1620,15 @@ private void transferToMaster() {
if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
}
if (statisticsCleaner != null) {
statisticsCleaner.start();
}
if (statisticsAutoCollector != null) {
statisticsAutoCollector.start();
}
if (statisticsJobAppender != null) {
statisticsJobAppender.start();
}
} catch (Throwable e) {
// When failed to transfer to master, we need to exit the process.
// Otherwise, the process will be in an unknown state.
Expand Down Expand Up @@ -6327,6 +6333,10 @@ public StatisticsAutoCollector getStatisticsAutoCollector() {
return statisticsAutoCollector;
}

public StatisticsJobAppender getStatisticsJobAppender() {
return statisticsJobAppender;
}

public NereidsSqlCacheManager getSqlCacheManager() {
return sqlCacheManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public boolean needReAnalyzeTable(TableStatsMeta tblStats) {
return true;
}
return System.currentTimeMillis()
- tblStats.updatedTime > StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis();
- tblStats.lastAnalyzeTime > StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2799,6 +2799,7 @@ private void handleShowAnalyze() {
java.time.ZoneId.systemDefault());
row.add(startTime.format(formatter));
row.add(endTime.format(formatter));
row.add(analysisInfo.priority == null ? "N/A" : analysisInfo.priority.name());
resultRows.add(row);
} catch (Exception e) {
LOG.warn("Failed to get analyze info for table {}.{}.{}, reason: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2513,7 +2513,7 @@ private void handleUnsupportedStmt() {
context.getState().setOk();
}

private void handleAnalyzeStmt() throws DdlException, AnalysisException {
private void handleAnalyzeStmt() throws DdlException, AnalysisException, ExecutionException, InterruptedException {
context.env.getAnalysisManager().createAnalyze((AnalyzeStmt) parsedStmt, isProxy);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ public enum ScheduleType {

@SerializedName("rowCount")
public final long rowCount;

@SerializedName("priority")
public final JobPriority priority;

@SerializedName("tv")
public final long tableVersion;

/**
*
* Used to store the newest partition version of tbl when creating this job.
Expand All @@ -214,7 +221,7 @@ public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId,
boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition,
boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull,
boolean usingSqlForPartitionColumn, long tblUpdateTime, boolean emptyJob, boolean userInject,
long rowCount) {
long rowCount, JobPriority priority, long tableVersion) {
this.jobId = jobId;
this.taskId = taskId;
this.taskIds = taskIds;
Expand Down Expand Up @@ -253,6 +260,8 @@ public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId,
this.emptyJob = emptyJob;
this.userInject = userInject;
this.rowCount = rowCount;
this.priority = priority;
this.tableVersion = tableVersion;
}

@Override
Expand Down Expand Up @@ -295,6 +304,8 @@ public String toString() {
sj.add("forceFull: " + forceFull);
sj.add("usingSqlForPartitionColumn: " + usingSqlForPartitionColumn);
sj.add("emptyJob: " + emptyJob);
sj.add("priority: " + priority.name());
sj.add("tableVersion: " + tableVersion);
return sj.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public class AnalysisInfoBuilder {
private boolean emptyJob;
private boolean userInject = false;
private long rowCount;
private JobPriority priority;
private long tableVersion;

public AnalysisInfoBuilder() {
}
Expand Down Expand Up @@ -105,6 +107,8 @@ public AnalysisInfoBuilder(AnalysisInfo info) {
emptyJob = info.emptyJob;
userInject = info.userInject;
rowCount = info.rowCount;
priority = info.priority;
tableVersion = info.tableVersion;
}

public AnalysisInfoBuilder setJobId(long jobId) {
Expand Down Expand Up @@ -282,12 +286,23 @@ public AnalysisInfoBuilder setRowCount(long rowCount) {
return this;
}

public AnalysisInfoBuilder setPriority(JobPriority priority) {
this.priority = priority;
return this;
}

public AnalysisInfoBuilder setTableVersion(long tableVersion) {
this.tableVersion = tableVersion;
return this;
}

public AnalysisInfo build() {
return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId, tblId, jobColumns, partitionNames,
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType,
externalTableLevelTask, partitionOnly, samplingPartition, isAllPartition, partitionCount,
cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, emptyJob, userInject, rowCount);
cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, emptyJob, userInject, rowCount,
priority, tableVersion);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -144,7 +145,8 @@ public StatisticsCache getStatisticsCache() {
return statisticsCache;
}

public void createAnalyze(AnalyzeStmt analyzeStmt, boolean proxy) throws DdlException, AnalysisException {
public void createAnalyze(AnalyzeStmt analyzeStmt, boolean proxy)
throws DdlException, AnalysisException, ExecutionException, InterruptedException {
if (!StatisticsUtil.statsTblAvailable() && !FeConstants.runningUnitTest) {
throw new DdlException("Stats table not available, please make sure your cluster status is normal");
}
Expand All @@ -157,10 +159,8 @@ public void createAnalyze(AnalyzeStmt analyzeStmt, boolean proxy) throws DdlExce

public void createAnalysisJobs(AnalyzeDBStmt analyzeDBStmt, boolean proxy) throws DdlException, AnalysisException {
DatabaseIf<TableIf> db = analyzeDBStmt.getDb();
// Using auto analyzer if user specifies.
if (analyzeDBStmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer")) {
Env.getCurrentEnv().getStatisticsAutoCollector().analyzeDb(db);
return;
throw new DdlException("Analyze database doesn't support use.auto.analyzer property.");
}
List<AnalysisInfo> analysisInfos = buildAnalysisInfosForDB(db, analyzeDBStmt.getAnalyzeProperties());
if (!analyzeDBStmt.isSync()) {
Expand Down Expand Up @@ -208,22 +208,12 @@ public List<AnalysisInfo> buildAnalysisInfosForDB(DatabaseIf<TableIf> db, Analyz
}

// Each analyze stmt corresponding to an analysis job.
public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) throws DdlException, AnalysisException {
public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy)
throws DdlException, AnalysisException, ExecutionException, InterruptedException {
// Using auto analyzer if user specifies.
if (stmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer")) {
StatisticsAutoCollector autoCollector = Env.getCurrentEnv().getStatisticsAutoCollector();
if (autoCollector.skip(stmt.getTable())) {
return;
}
List<AnalysisInfo> jobs = new ArrayList<>();
autoCollector.createAnalyzeJobForTbl(stmt.getDb(), jobs, stmt.getTable());
if (jobs.isEmpty()) {
return;
}
AnalysisInfo job = autoCollector.getNeedAnalyzeColumns(jobs.get(0));
if (job != null) {
Env.getCurrentEnv().getStatisticsAutoCollector().createSystemAnalysisJob(job);
}
autoCollector.processOneJob(stmt.getTable(), JobPriority.MANUAL_AUTO);
return;
}
AnalysisInfo jobInfo = buildAndAssignJob(stmt);
Expand Down Expand Up @@ -347,7 +337,6 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExceptio
infoBuilder.setAnalysisMode(analysisMode);
infoBuilder.setAnalysisMethod(analysisMethod);
infoBuilder.setScheduleType(scheduleType);
infoBuilder.setLastExecTimeInMs(0);
infoBuilder.setCronExpression(cronExpression);
infoBuilder.setForceFull(stmt.forceFull());
infoBuilder.setUsingSqlForPartitionColumn(stmt.usingSqlForPartitionColumn());
Expand Down Expand Up @@ -377,6 +366,8 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExceptio
&& analysisMethod.equals(AnalysisMethod.SAMPLE));
long rowCount = StatisticsUtil.isEmptyTable(table, analysisMethod) ? 0 : table.getRowCount();
infoBuilder.setRowCount(rowCount);
infoBuilder.setPriority(JobPriority.MANUAL);
infoBuilder.setTableVersion(table instanceof OlapTable ? ((OlapTable) table).getVisibleVersion() : 0);
return infoBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -88,9 +89,9 @@ protected void tryToCancel() {
}
}

public void submitTask(BaseAnalysisTask task) {
public Future<?> submitTask(BaseAnalysisTask task) {
AnalysisTaskWrapper taskWrapper = new AnalysisTaskWrapper(this, task);
executors.submit(taskWrapper);
return executors.submit(taskWrapper);
}

public void putJob(AnalysisTaskWrapper wrapper) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@ public class ColStatsMeta {
@SerializedName("trigger")
public JobType jobType;

@SerializedName("tv")
public long tableVersion;

public ColStatsMeta(long updatedTime, AnalysisMethod analysisMethod,
AnalysisType analysisType, JobType jobType, long queriedTimes) {
AnalysisType analysisType, JobType jobType, long queriedTimes, long tableVersion) {
this.updatedTime = updatedTime;
this.analysisMethod = analysisMethod;
this.analysisType = analysisType;
this.jobType = jobType;
this.queriedTimes.addAndGet(queriedTimes);
this.tableVersion = tableVersion;
}

public void clear() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.statistics;

public enum JobPriority {
HIGH,
LOW,
MANUAL,
MANUAL_AUTO;
}
Loading

0 comments on commit 1ce4940

Please sign in to comment.