Skip to content

Commit

Permalink
Support auto analyze columns that haven't been analyzed for a long time.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jibing-Li committed Oct 25, 2024
1 parent 38313fc commit b941eae
Show file tree
Hide file tree
Showing 16 changed files with 641 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2863,7 +2863,7 @@ public class Config extends ConfigBase {
"Columns that have not been collected within the specified interval will trigger automatic analyze. "
+ "0 means not trigger."
})
public static long auto_analyze_interval_seconds = 0;
public static long auto_analyze_interval_seconds = 79200; // 22 hours.

//==========================================================================
// begin of cloud config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* [TABLE]
* [
* WHERE
* [PRIORITY = ["HIGH"|"MID"|"LOW"]]
* [PRIORITY = ["HIGH"|"MID"|"LOW"|"LONG_TIME"]]
* ]
*/
public class ShowAutoAnalyzeJobsStmt extends ShowStmt implements NotFallbackInParser {
Expand Down Expand Up @@ -175,7 +175,7 @@ private void analyzeSubPredicate(Expr subExpr) throws AnalysisException {

if (!valid) {
throw new AnalysisException("Where clause should looks like: "
+ "PRIORITY = \"HIGH|MID|LOW\"");
+ "PRIORITY = \"HIGH|MID|LOW|LONG_TIME\"");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class ShowTableStatsStmt extends ShowStmt implements NotFallbackInParser
.add("new_partition")
.add("user_inject")
.add("enable_auto_analyze")
.add("last_analyze_time")
.build();

private static final ImmutableList<String> PARTITION_TITLE_NAMES =
Expand Down Expand Up @@ -230,6 +231,7 @@ public ShowResultSet constructTableResultSet(TableStatsMeta tableStatistic, Tabl
row.add("");
row.add("");
row.add(String.valueOf(table.autoAnalyzeEnabled()));
row.add("");
result.add(row);
return new ShowResultSet(getMetaData(), result);
}
Expand All @@ -242,13 +244,16 @@ public ShowResultSet constructTableResultSet(TableStatsMeta tableStatistic, Tabl
LocalDateTime dateTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.updatedTime),
java.time.ZoneId.systemDefault());
String formattedDateTime = dateTime.format(formatter);
row.add(formattedDateTime);
LocalDateTime lastAnalyzeTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.lastAnalyzeTime),
java.time.ZoneId.systemDefault());
row.add(dateTime.format(formatter));
row.add(tableStatistic.analyzeColumns().toString());
row.add(tableStatistic.jobType.toString());
row.add(String.valueOf(tableStatistic.partitionChanged.get()));
row.add(String.valueOf(tableStatistic.userInjected));
row.add(table == null ? "N/A" : String.valueOf(table.autoAnalyzeEnabled()));
row.add(lastAnalyzeTime.format(formatter));
result.add(row);
return new ShowResultSet(getMetaData(), result);
}
Expand Down
9 changes: 9 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.FollowerColumnSender;
import org.apache.doris.statistics.LongTimeJobAppender;
import org.apache.doris.statistics.StatisticsAutoCollector;
import org.apache.doris.statistics.StatisticsCache;
import org.apache.doris.statistics.StatisticsCleaner;
Expand Down Expand Up @@ -552,6 +553,8 @@ public class Env {

private StatisticsJobAppender statisticsJobAppender;

private LongTimeJobAppender longTimeJobAppender;

private FollowerColumnSender followerColumnSender;

private HiveTransactionMgr hiveTransactionMgr;
Expand Down Expand Up @@ -798,6 +801,7 @@ public Env(boolean isCheckpointCatalog) {
this.statisticsCleaner = new StatisticsCleaner();
this.statisticsAutoCollector = new StatisticsAutoCollector();
this.statisticsJobAppender = new StatisticsJobAppender();
this.longTimeJobAppender = new LongTimeJobAppender();
this.globalFunctionMgr = new GlobalFunctionMgr();
this.workloadGroupMgr = new WorkloadGroupMgr();
this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr();
Expand Down Expand Up @@ -1849,6 +1853,7 @@ protected void startMasterOnlyDaemonThreads() {
statisticsCleaner.start();
statisticsAutoCollector.start();
statisticsJobAppender.start();
longTimeJobAppender.start();
}

// start threads that should run on all FE
Expand Down Expand Up @@ -6612,6 +6617,10 @@ public StatisticsJobAppender getStatisticsJobAppender() {
return statisticsJobAppender;
}

public LongTimeJobAppender getLongTimeJobAppender() {
return longTimeJobAppender;
}

public void alterMTMVRefreshInfo(AlterMTMVRefreshInfo info) {
AlterMTMV alter = new AlterMTMV(info.getMvName(), info.getRefreshInfo(), MTMVAlterOpType.ALTER_REFRESH_INFO);
this.alter.processAlterMTMV(alter, false);
Expand Down
11 changes: 0 additions & 11 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -969,17 +969,6 @@ public Set<Column> getSchemaAllIndexes(boolean full) {
return columns;
}

public List<Column> getMvColumns(boolean full) {
List<Column> columns = Lists.newArrayList();
for (Long indexId : indexIdToMeta.keySet()) {
if (indexId == baseIndexId) {
continue;
}
columns.addAll(getSchemaByIndexId(indexId, full));
}
return columns;
}

public List<Column> getBaseSchemaKeyColumns() {
return getKeyColumnsByIndexId(baseIndexId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import java.util.StringJoiner;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
Expand All @@ -130,6 +131,9 @@ public class AnalysisManager implements Writable {
public final Map<TableName, Set<Pair<String, String>>> midPriorityJobs = new LinkedHashMap<>();
public final Map<TableName, Set<Pair<String, String>>> lowPriorityJobs = new LinkedHashMap<>();

public static final int LONG_TIME_JOB_QUEUE_LIMIT = 10;
private final BlockingQueue<TableIf> longTimeJobs = new ArrayBlockingQueue<>(LONG_TIME_JOB_QUEUE_LIMIT);

// Tracking running manually submitted async tasks, keep in mem only
protected final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> analysisJobIdToTaskMap = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -381,7 +385,7 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) {
}
infoBuilder.setColName(stringJoiner.toString());
infoBuilder.setTaskIds(Lists.newArrayList());
infoBuilder.setTblUpdateTime(System.currentTimeMillis());
infoBuilder.setTblUpdateTime(table.getUpdateTime());
// Empty table row count is 0. Call fetchRowCount() when getRowCount() returns <= 0,
// because getRowCount may return <= 0 if cached is not loaded. This is mainly for external table.
long rowCount = StatisticsUtil.isEmptyTable(table, analysisMethod) ? 0 :
Expand Down Expand Up @@ -547,16 +551,29 @@ public List<AutoAnalysisPendingJob> showAutoPendingJobs(ShowAutoAnalyzeJobsStmt
result.addAll(getPendingJobs(highPriorityJobs, JobPriority.HIGH, tblName));
result.addAll(getPendingJobs(midPriorityJobs, JobPriority.MID, tblName));
result.addAll(getPendingJobs(lowPriorityJobs, JobPriority.LOW, tblName));
showLongTimeJobs(result);
} else if (priority.equals(JobPriority.HIGH.name())) {
result.addAll(getPendingJobs(highPriorityJobs, JobPriority.HIGH, tblName));
} else if (priority.equals(JobPriority.MID.name())) {
result.addAll(getPendingJobs(midPriorityJobs, JobPriority.MID, tblName));
} else if (priority.equals(JobPriority.LOW.name())) {
result.addAll(getPendingJobs(lowPriorityJobs, JobPriority.LOW, tblName));
} else if (priority.equals(JobPriority.LONG_TIME.name())) {
showLongTimeJobs(result);
}
return result;
}

protected void showLongTimeJobs(List<AutoAnalysisPendingJob> result) {
Object[] objects = longTimeJobs.toArray();
for (Object object : objects) {
TableIf table = (TableIf) object;
Set<Pair<String, String>> columnIndexPairs = StatisticsUtil.getLongTimeColumns(table);
result.add(new AutoAnalysisPendingJob(table.getDatabase().getCatalog().getName(),
table.getDatabase().getFullName(), table.getName(), columnIndexPairs, JobPriority.LONG_TIME));
}
}

protected List<AutoAnalysisPendingJob> getPendingJobs(Map<TableName, Set<Pair<String, String>>> jobMap,
JobPriority priority, TableName tblName) {
List<AutoAnalysisPendingJob> result = Lists.newArrayList();
Expand Down Expand Up @@ -1482,4 +1499,42 @@ public void mergeFollowerQueryColumns(Collection<TQueryColumn> highColumns,
}
}
}

public boolean appendToLongTimeJobs(TableIf table) throws InterruptedException {
Object[] objects = longTimeJobs.toArray();
String catalogName = table.getDatabase().getCatalog().getName();
String dbName = table.getDatabase().getFullName();
String tableName = table.getName();
for (Object o : objects) {
TableIf queuedTbl = (TableIf) o;
try {
if (catalogName.equals(queuedTbl.getDatabase().getCatalog().getName())
&& dbName.equals(queuedTbl.getDatabase().getFullName())
&& tableName.equals(queuedTbl.getName())) {
LOG.debug("Table {}.{}.{} already added to long time job queue, skip it.",
catalogName, dbName, tableName);
return false;
}
} catch (Exception e) {
LOG.warn("Failed to compare long time job for table {}, id {}", table.getName(), table.getId());
}
}
longTimeJobs.put(table);
return true;
}

public TableIf getLongTimeJob() {
try {
return longTimeJobs.poll(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}

// For unit test only
@VisibleForTesting
public BlockingQueue<TableIf> getLongTimeJobQueue() {
return longTimeJobs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ public enum JobPriority {
HIGH,
MID,
LOW,
LONG_TIME,
MANUAL;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.statistics;

import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.statistics.util.StatisticsUtil;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* This is a daemon thread to append long time not analyzed columns to long time job queue in AnalysisManager.
* Using this separate thread instead of using the existing StatisticsJobAppender is to avoid blocking other
* higher priority jobs.
*/
public class LongTimeJobAppender extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(LongTimeJobAppender.class);

public LongTimeJobAppender() {
super("LongTimeJobAppender", TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes));
}

@Override
protected void runAfterCatalogReady() {
if (!Env.getCurrentEnv().isMaster()) {
return;
}
if (!StatisticsUtil.statsTblAvailable()) {
LOG.info("Stats table not available, skip");
return;
}
if (Env.getCurrentEnv().getStatisticsAutoCollector() == null
|| !Env.getCurrentEnv().getStatisticsAutoCollector().isReady()) {
LOG.info("Statistics auto collector not ready, skip");
return;
}
if (Env.isCheckpointThread()) {
return;
}
if (!StatisticsUtil.canCollect()) {
LOG.debug("Auto analyze not enabled or not in analyze time range.");
return;
}
traverseAllTables();
}

protected int traverseAllTables() {
List<CatalogIf> catalogs = getCatalogsInOrder();
AnalysisManager analysisManager = Env.getServingEnv().getAnalysisManager();
int addedTableCount = 0;
for (CatalogIf ctl : catalogs) {
if (!StatisticsUtil.canCollect()) {
break;
}
if (!ctl.enableAutoAnalyze()) {
continue;
}
List<DatabaseIf> dbs = getDatabasesInOrder(ctl);
for (DatabaseIf<TableIf> db : dbs) {
if (!StatisticsUtil.canCollect()) {
break;
}
if (StatisticConstants.SYSTEM_DBS.contains(db.getFullName())) {
continue;
}
for (TableIf table : getTablesInOrder(db)) {
try {
if (appendTable(analysisManager, table)) {
addedTableCount++;
}
} catch (Throwable t) {
LOG.warn("Failed to analyze table {}.{}.{}",
db.getCatalog().getName(), db.getFullName(), table.getName(), t);
}
}
}
}
LOG.info("Finished one iteration of long time job. Append {} tables", addedTableCount);
return addedTableCount;
}

public List<CatalogIf> getCatalogsInOrder() {
return Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog().stream()
.sorted((c1, c2) -> (int) (c1.getId() - c2.getId())).collect(Collectors.toList());
}

public List<DatabaseIf<? extends TableIf>> getDatabasesInOrder(CatalogIf<DatabaseIf> catalog) {
return catalog.getAllDbs().stream()
.sorted((d1, d2) -> (int) (d1.getId() - d2.getId())).collect(Collectors.toList());
}

public List<TableIf> getTablesInOrder(DatabaseIf<? extends TableIf> db) {
return db.getTables().stream()
.sorted((t1, t2) -> (int) (t1.getId() - t2.getId())).collect(Collectors.toList());
}

// return true if skip auto analyze this time.
protected boolean skip(TableIf table) {
if (!(table instanceof OlapTable || table instanceof HMSExternalTable)) {
return true;
}
// For now, only support Hive HMS table auto collection.
if (table instanceof HMSExternalTable
&& !((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
return true;
}
// Skip wide table.
return table.getBaseSchema().size() > StatisticsUtil.getAutoAnalyzeTableWidthThreshold();
}

protected boolean appendTable(AnalysisManager analysisManager, TableIf table) throws InterruptedException {
if (skip(table)) {
return false;
}
TableStatsMeta tblStats = analysisManager.findTableStatsStatus(table.getId());
if (tblStats == null) {
return false;
}
Set<Pair<String, String>> columns = tblStats.analyzeColumns();
if (columns == null || columns.isEmpty()) {
return false;
}
for (Pair<String, String> columnPair : columns) {
if (StatisticsUtil.isLongTimeColumn(table, columnPair)) {
if (analysisManager.appendToLongTimeJobs(table)) {
return true;
}
}
}
return false;
}
}
Loading

0 comments on commit b941eae

Please sign in to comment.