diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 97bcb39403a2513..98a503896536ea0 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2863,7 +2863,7 @@ public class Config extends ConfigBase { "Columns that have not been collected within the specified interval will trigger automatic analyze. " + "0 means not trigger." }) - public static long auto_analyze_interval_seconds = 0; + public static long auto_analyze_interval_seconds = 79200; // 22 hours. //========================================================================== // begin of cloud config diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAutoAnalyzeJobsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAutoAnalyzeJobsStmt.java index 9b07796df784d41..a9e36209200c628 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAutoAnalyzeJobsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAutoAnalyzeJobsStmt.java @@ -40,7 +40,7 @@ * [TABLE] * [ * WHERE - * [PRIORITY = ["HIGH"|"MID"|"LOW"]] + * [PRIORITY = ["HIGH"|"MID"|"LOW"|"LONG_TIME"]] * ] */ public class ShowAutoAnalyzeJobsStmt extends ShowStmt implements NotFallbackInParser { @@ -175,7 +175,7 @@ private void analyzeSubPredicate(Expr subExpr) throws AnalysisException { if (!valid) { throw new AnalysisException("Where clause should looks like: " - + "PRIORITY = \"HIGH|MID|LOW\""); + + "PRIORITY = \"HIGH|MID|LOW|LONG_TIME\""); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java index ccffee3086dbead..ea9b96d0afecd49 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java @@ -62,6 +62,7 @@ public class ShowTableStatsStmt extends ShowStmt implements NotFallbackInParser .add("new_partition") .add("user_inject") .add("enable_auto_analyze") + .add("last_analyze_time") .build(); private static final ImmutableList PARTITION_TITLE_NAMES = @@ -230,6 +231,7 @@ public ShowResultSet constructTableResultSet(TableStatsMeta tableStatistic, Tabl row.add(""); row.add(""); row.add(String.valueOf(table.autoAnalyzeEnabled())); + row.add(""); result.add(row); return new ShowResultSet(getMetaData(), result); } @@ -242,13 +244,16 @@ public ShowResultSet constructTableResultSet(TableStatsMeta tableStatistic, Tabl LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.updatedTime), java.time.ZoneId.systemDefault()); - String formattedDateTime = dateTime.format(formatter); - row.add(formattedDateTime); + LocalDateTime lastAnalyzeTime = + LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.lastAnalyzeTime), + java.time.ZoneId.systemDefault()); + row.add(dateTime.format(formatter)); row.add(tableStatistic.analyzeColumns().toString()); row.add(tableStatistic.jobType.toString()); row.add(String.valueOf(tableStatistic.partitionChanged.get())); row.add(String.valueOf(tableStatistic.userInjected)); row.add(table == null ? "N/A" : String.valueOf(table.autoAnalyzeEnabled())); + row.add(lastAnalyzeTime.format(formatter)); result.add(row); return new ShowResultSet(getMetaData(), result); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index ef23713a5010b49..5b3d59435f3adcc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -259,6 +259,7 @@ import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.AnalysisManager; import org.apache.doris.statistics.FollowerColumnSender; +import org.apache.doris.statistics.LongTimeJobAppender; import org.apache.doris.statistics.StatisticsAutoCollector; import org.apache.doris.statistics.StatisticsCache; import org.apache.doris.statistics.StatisticsCleaner; @@ -552,6 +553,8 @@ public class Env { private StatisticsJobAppender statisticsJobAppender; + private LongTimeJobAppender longTimeJobAppender; + private FollowerColumnSender followerColumnSender; private HiveTransactionMgr hiveTransactionMgr; @@ -798,6 +801,7 @@ public Env(boolean isCheckpointCatalog) { this.statisticsCleaner = new StatisticsCleaner(); this.statisticsAutoCollector = new StatisticsAutoCollector(); this.statisticsJobAppender = new StatisticsJobAppender(); + this.longTimeJobAppender = new LongTimeJobAppender(); this.globalFunctionMgr = new GlobalFunctionMgr(); this.workloadGroupMgr = new WorkloadGroupMgr(); this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr(); @@ -1849,6 +1853,7 @@ protected void startMasterOnlyDaemonThreads() { statisticsCleaner.start(); statisticsAutoCollector.start(); statisticsJobAppender.start(); + longTimeJobAppender.start(); } // start threads that should run on all FE @@ -6612,6 +6617,10 @@ public StatisticsJobAppender getStatisticsJobAppender() { return statisticsJobAppender; } + public LongTimeJobAppender getLongTimeJobAppender() { + return longTimeJobAppender; + } + public void alterMTMVRefreshInfo(AlterMTMVRefreshInfo info) { AlterMTMV alter = new AlterMTMV(info.getMvName(), info.getRefreshInfo(), MTMVAlterOpType.ALTER_REFRESH_INFO); this.alter.processAlterMTMV(alter, false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 999e0c43995f00a..d01ebceb85859f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -969,17 +969,6 @@ public Set getSchemaAllIndexes(boolean full) { return columns; } - public List getMvColumns(boolean full) { - List columns = Lists.newArrayList(); - for (Long indexId : indexIdToMeta.keySet()) { - if (indexId == baseIndexId) { - continue; - } - columns.addAll(getSchemaByIndexId(indexId, full)); - } - return columns; - } - public List getBaseSchemaKeyColumns() { return getKeyColumnsByIndexId(baseIndexId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index c2b20707f133e8e..4dbbdb5bad41ba4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -110,6 +110,7 @@ import java.util.StringJoiner; import java.util.TreeMap; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -130,6 +131,9 @@ public class AnalysisManager implements Writable { public final Map>> midPriorityJobs = new LinkedHashMap<>(); public final Map>> lowPriorityJobs = new LinkedHashMap<>(); + public static final int LONG_TIME_JOB_QUEUE_LIMIT = 10; + private final BlockingQueue longTimeJobs = new ArrayBlockingQueue<>(LONG_TIME_JOB_QUEUE_LIMIT); + // Tracking running manually submitted async tasks, keep in mem only protected final ConcurrentMap> analysisJobIdToTaskMap = new ConcurrentHashMap<>(); @@ -381,7 +385,7 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) { } infoBuilder.setColName(stringJoiner.toString()); infoBuilder.setTaskIds(Lists.newArrayList()); - infoBuilder.setTblUpdateTime(System.currentTimeMillis()); + infoBuilder.setTblUpdateTime(table.getUpdateTime()); // Empty table row count is 0. Call fetchRowCount() when getRowCount() returns <= 0, // because getRowCount may return <= 0 if cached is not loaded. This is mainly for external table. long rowCount = StatisticsUtil.isEmptyTable(table, analysisMethod) ? 0 : @@ -547,16 +551,29 @@ public List showAutoPendingJobs(ShowAutoAnalyzeJobsStmt result.addAll(getPendingJobs(highPriorityJobs, JobPriority.HIGH, tblName)); result.addAll(getPendingJobs(midPriorityJobs, JobPriority.MID, tblName)); result.addAll(getPendingJobs(lowPriorityJobs, JobPriority.LOW, tblName)); + showLongTimeJobs(result); } else if (priority.equals(JobPriority.HIGH.name())) { result.addAll(getPendingJobs(highPriorityJobs, JobPriority.HIGH, tblName)); } else if (priority.equals(JobPriority.MID.name())) { result.addAll(getPendingJobs(midPriorityJobs, JobPriority.MID, tblName)); } else if (priority.equals(JobPriority.LOW.name())) { result.addAll(getPendingJobs(lowPriorityJobs, JobPriority.LOW, tblName)); + } else if (priority.equals(JobPriority.LONG_TIME.name())) { + showLongTimeJobs(result); } return result; } + protected void showLongTimeJobs(List result) { + Object[] objects = longTimeJobs.toArray(); + for (Object object : objects) { + TableIf table = (TableIf) object; + Set> columnIndexPairs = StatisticsUtil.getLongTimeColumns(table); + result.add(new AutoAnalysisPendingJob(table.getDatabase().getCatalog().getName(), + table.getDatabase().getFullName(), table.getName(), columnIndexPairs, JobPriority.LONG_TIME)); + } + } + protected List getPendingJobs(Map>> jobMap, JobPriority priority, TableName tblName) { List result = Lists.newArrayList(); @@ -1482,4 +1499,42 @@ public void mergeFollowerQueryColumns(Collection highColumns, } } } + + public boolean appendToLongTimeJobs(TableIf table) throws InterruptedException { + Object[] objects = longTimeJobs.toArray(); + String catalogName = table.getDatabase().getCatalog().getName(); + String dbName = table.getDatabase().getFullName(); + String tableName = table.getName(); + for (Object o : objects) { + TableIf queuedTbl = (TableIf) o; + try { + if (catalogName.equals(queuedTbl.getDatabase().getCatalog().getName()) + && dbName.equals(queuedTbl.getDatabase().getFullName()) + && tableName.equals(queuedTbl.getName())) { + LOG.debug("Table {}.{}.{} already added to long time job queue, skip it.", + catalogName, dbName, tableName); + return false; + } + } catch (Exception e) { + LOG.warn("Failed to compare long time job for table {}, id {}", table.getName(), table.getId()); + } + } + longTimeJobs.put(table); + return true; + } + + public TableIf getLongTimeJob() { + try { + return longTimeJobs.poll(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return null; + } + + // For unit test only + @VisibleForTesting + public BlockingQueue getLongTimeJobQueue() { + return longTimeJobs; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java index c3656b929279e6b..c289954dacd7b19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java @@ -21,5 +21,6 @@ public enum JobPriority { HIGH, MID, LOW, + LONG_TIME, MANUAL; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/LongTimeJobAppender.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/LongTimeJobAppender.java new file mode 100644 index 000000000000000..2eec7f97a37482f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/LongTimeJobAppender.java @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.statistics.util.StatisticsUtil; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * This is a daemon thread to append long time not analyzed columns to long time job queue in AnalysisManager. + * Using this separate thread instead of using the existing StatisticsJobAppender is to avoid blocking other + * higher priority jobs. + */ +public class LongTimeJobAppender extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(LongTimeJobAppender.class); + + public LongTimeJobAppender() { + super("LongTimeJobAppender", TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes)); + } + + @Override + protected void runAfterCatalogReady() { + if (!Env.getCurrentEnv().isMaster()) { + return; + } + if (!StatisticsUtil.statsTblAvailable()) { + LOG.info("Stats table not available, skip"); + return; + } + if (Env.getCurrentEnv().getStatisticsAutoCollector() == null + || !Env.getCurrentEnv().getStatisticsAutoCollector().isReady()) { + LOG.info("Statistics auto collector not ready, skip"); + return; + } + if (Env.isCheckpointThread()) { + return; + } + if (!StatisticsUtil.canCollect()) { + LOG.debug("Auto analyze not enabled or not in analyze time range."); + return; + } + traverseAllTables(); + } + + protected int traverseAllTables() { + List catalogs = getCatalogsInOrder(); + AnalysisManager analysisManager = Env.getServingEnv().getAnalysisManager(); + int addedTableCount = 0; + for (CatalogIf ctl : catalogs) { + if (!StatisticsUtil.canCollect()) { + break; + } + if (!ctl.enableAutoAnalyze()) { + continue; + } + List dbs = getDatabasesInOrder(ctl); + for (DatabaseIf db : dbs) { + if (!StatisticsUtil.canCollect()) { + break; + } + if (StatisticConstants.SYSTEM_DBS.contains(db.getFullName())) { + continue; + } + for (TableIf table : getTablesInOrder(db)) { + try { + if (appendTable(analysisManager, table)) { + addedTableCount++; + } + } catch (Throwable t) { + LOG.warn("Failed to analyze table {}.{}.{}", + db.getCatalog().getName(), db.getFullName(), table.getName(), t); + } + } + } + } + LOG.info("Finished one iteration of long time job. Append {} tables", addedTableCount); + return addedTableCount; + } + + public List getCatalogsInOrder() { + return Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog().stream() + .sorted((c1, c2) -> (int) (c1.getId() - c2.getId())).collect(Collectors.toList()); + } + + public List> getDatabasesInOrder(CatalogIf catalog) { + return catalog.getAllDbs().stream() + .sorted((d1, d2) -> (int) (d1.getId() - d2.getId())).collect(Collectors.toList()); + } + + public List getTablesInOrder(DatabaseIf db) { + return db.getTables().stream() + .sorted((t1, t2) -> (int) (t1.getId() - t2.getId())).collect(Collectors.toList()); + } + + // return true if skip auto analyze this time. + protected boolean skip(TableIf table) { + if (!(table instanceof OlapTable || table instanceof HMSExternalTable)) { + return true; + } + // For now, only support Hive HMS table auto collection. + if (table instanceof HMSExternalTable + && !((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) { + return true; + } + // Skip wide table. + return table.getBaseSchema().size() > StatisticsUtil.getAutoAnalyzeTableWidthThreshold(); + } + + protected boolean appendTable(AnalysisManager analysisManager, TableIf table) throws InterruptedException { + if (skip(table)) { + return false; + } + TableStatsMeta tblStats = analysisManager.findTableStatsStatus(table.getId()); + if (tblStats == null) { + return false; + } + Set> columns = tblStats.analyzeColumns(); + if (columns == null || columns.isEmpty()) { + return false; + } + for (Pair columnPair : columns) { + if (StatisticsUtil.isLongTimeColumn(table, columnPair)) { + if (analysisManager.appendToLongTimeJobs(table)) { + return true; + } + } + } + return false; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 9ba52169605b435..205d53acbfc3ade 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -26,7 +26,6 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; import org.apache.doris.common.util.MasterDaemon; -import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.JobType; @@ -37,7 +36,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.time.LocalTime; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -92,10 +91,11 @@ protected void runAfterCatalogReady() { } protected void collect() { - while (canCollect()) { + while (StatisticsUtil.canCollect()) { Pair>>, JobPriority> job = getJob(); if (job == null) { // No more job to process, break and sleep. + LOG.info("No auto analyze jobs to process."); break; } try { @@ -112,11 +112,6 @@ protected void collect() { } } - protected boolean canCollect() { - return StatisticsUtil.enableAutoAnalyze() - && StatisticsUtil.inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId())); - } - protected Pair>>, JobPriority> getJob() { AnalysisManager manager = Env.getServingEnv().getAnalysisManager(); Optional>>> job = fetchJobFromMap(manager.highPriorityJobs); @@ -128,7 +123,11 @@ protected Pair>>, JobPriority> getJob( return Pair.of(job.get(), JobPriority.MID); } job = fetchJobFromMap(manager.lowPriorityJobs); - return job.map(entry -> Pair.of(entry, JobPriority.LOW)).orElse(null); + if (job.isPresent()) { + return Pair.of(job.get(), JobPriority.LOW); + } + job = fetchLongTimeJob(manager.getLongTimeJob()); + return job.map(entry -> Pair.of(entry, JobPriority.LONG_TIME)).orElse(null); } protected Optional>>> fetchJobFromMap( @@ -140,11 +139,25 @@ protected Optional>>> fetchJobFrom } } + protected Optional>>> fetchLongTimeJob(TableIf table) { + if (table == null) { + return Optional.empty(); + } + Set> columnIndexPairs = StatisticsUtil.getLongTimeColumns(table); + if (columnIndexPairs.isEmpty()) { + return Optional.empty(); + } + TableName tableName = new TableName(table.getDatabase().getCatalog().getName(), + table.getDatabase().getFullName(), table.getName()); + return Optional.of(new AbstractMap.SimpleEntry<>(tableName, columnIndexPairs)); + } + protected void processOneJob(TableIf table, Set> columns, JobPriority priority) throws DdlException { - // appendMvColumn(table, columns); appendAllColumns(table, columns); - columns = columns.stream().filter(c -> StatisticsUtil.needAnalyzeColumn(table, c)).collect(Collectors.toSet()); + columns = columns.stream().filter( + c -> StatisticsUtil.needAnalyzeColumn(table, c) || StatisticsUtil.isLongTimeColumn(table, c)) + .collect(Collectors.toSet()); if (columns.isEmpty()) { return; } @@ -181,15 +194,6 @@ protected void appendAllColumns(TableIf table, Set> columns } } - protected void appendMvColumn(TableIf table, Set columns) { - if (!(table instanceof OlapTable)) { - return; - } - OlapTable olapTable = (OlapTable) table; - Set mvColumns = olapTable.getMvColumns(false).stream().map(Column::getName).collect(Collectors.toSet()); - columns.addAll(mvColumns); - } - protected boolean supportAutoAnalyze(TableIf tableIf) { if (tableIf == null) { return false; @@ -238,7 +242,7 @@ protected AnalysisInfo createAnalyzeJobForTbl( .setTaskIds(new ArrayList<>()) .setLastExecTimeInMs(System.currentTimeMillis()) .setJobType(JobType.SYSTEM) - .setTblUpdateTime(System.currentTimeMillis()) + .setTblUpdateTime(table.getUpdateTime()) .setRowCount(rowCount) .setUpdateRows(tableStatsStatus == null ? 0 : tableStatsStatus.updatedRows.get()) .setPriority(priority) @@ -265,4 +269,8 @@ protected void executeSystemAnalysisJob(AnalysisInfo jobInfo) future.get(); } } + + public boolean isReady() { + return waited; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java index b67d1cf947cb4ec..8a69aa343e7e885 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java @@ -70,6 +70,11 @@ protected void runAfterCatalogReady() { LOG.info("Stats table not available, skip"); return; } + if (Env.getCurrentEnv().getStatisticsAutoCollector() == null + || !Env.getCurrentEnv().getStatisticsAutoCollector().isReady()) { + LOG.info("Statistics auto collector not ready, skip"); + return; + } if (Env.isCheckpointThread()) { return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java index ba23ab84dc7a322..ac4704b54c6aae6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java @@ -379,9 +379,8 @@ public static void alterColumnStatistics(AlterColumnStatsStmt alterColumnStatsSt objects.catalog.getId(), objects.db.getId(), objects.table.getId(), indexId, colName, null, columnStatistic); Env.getCurrentEnv().getStatisticsCache().syncColStats(data); - long timestamp = System.currentTimeMillis(); AnalysisInfo mockedJobInfo = new AnalysisInfoBuilder() - .setTblUpdateTime(timestamp) + .setTblUpdateTime(objects.table.getUpdateTime()) .setColName("") .setRowCount((long) Double.parseDouble(rowCount)) .setJobColumns(Sets.newHashSet()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java index 9c8959d807b1903..a7982ead728b991 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java @@ -80,6 +80,9 @@ public class TableStatsMeta implements Writable, GsonPostProcessable { @SerializedName("updateTime") public long updatedTime; + @SerializedName("lat") + public long lastAnalyzeTime; + @SerializedName("colNameToColStatsMeta") private ConcurrentMap deprecatedColNameToColStatsMeta = new ConcurrentHashMap<>(); @@ -160,6 +163,7 @@ public Set> analyzeColumns() { public void update(AnalysisInfo analyzedJob, TableIf tableIf) { updatedTime = analyzedJob.tblUpdateTime; + lastAnalyzeTime = analyzedJob.createTime; if (analyzedJob.userInject) { userInjected = true; } @@ -170,7 +174,7 @@ public void update(AnalysisInfo analyzedJob, TableIf tableIf) { analyzedJob.analysisType, analyzedJob.jobType, 0, analyzedJob.rowCount, analyzedJob.updateRows, analyzedJob.enablePartition ? analyzedJob.partitionUpdateRows : null)); } else { - colStatsMeta.updatedTime = analyzedJob.tblUpdateTime; + colStatsMeta.updatedTime = analyzedJob.createTime; colStatsMeta.analysisType = analyzedJob.analysisType; colStatsMeta.analysisMethod = analyzedJob.analysisMethod; colStatsMeta.jobType = analyzedJob.jobType; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index d51281eb0e667c3..6c6f5d6d0b11446 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -53,6 +53,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.InternalCatalog; @@ -1007,12 +1008,6 @@ public static boolean needAnalyzeColumn(TableIf table, Pair colu if (columnStatsMeta == null) { return true; } - // Column hasn't been analyzed for longer than config interval. - if (Config.auto_analyze_interval_seconds > 0 - && System.currentTimeMillis() - columnStatsMeta.updatedTime - > Config.auto_analyze_interval_seconds * 1000) { - return true; - } // Partition table partition stats never been collected. if (StatisticsUtil.enablePartitionAnalyze() && table.isPartitionedTable() && columnStatsMeta.partitionUpdateRows == null) { @@ -1067,7 +1062,7 @@ public static boolean needAnalyzeColumn(TableIf table, Pair colu } // External is hard to calculate change rate, use time interval to control analyze frequency. return System.currentTimeMillis() - - tableStatsStatus.updatedTime > StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis(); + - tableStatsStatus.lastAnalyzeTime > StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis(); } } @@ -1122,4 +1117,48 @@ public static boolean needAnalyzePartition(OlapTable table, TableStatsMeta table } return false; } + + // This function return true means the column has been analyzed before, and it's health rate is fine. + // But it hasn't been analyzed longer than the configured time. (22 hours by default.) + public static boolean isLongTimeColumn(TableIf table, Pair column) { + if (column == null) { + return false; + } + if (!table.autoAnalyzeEnabled()) { + return false; + } + AnalysisManager manager = Env.getServingEnv().getAnalysisManager(); + TableStatsMeta tblStats = manager.findTableStatsStatus(table.getId()); + // Table never been analyzed, skip it for higher priority jobs. + if (tblStats == null) { + LOG.warn("Table stats is null."); + return false; + } + ColStatsMeta columnStats = tblStats.findColumnStatsMeta(column.first, column.second); + if (columnStats == null) { + // Column never been analyzed, skip it for higher priority jobs. + return false; + } + // User injected column stats, don't do auto analyze, avoid overwrite user injected stats. + if (tblStats.userInjected) { + return false; + } + return Config.auto_analyze_interval_seconds > 0 + && System.currentTimeMillis() - columnStats.updatedTime > Config.auto_analyze_interval_seconds * 1000; + } + + public static boolean canCollect() { + return enableAutoAnalyze() && inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId())); + } + + public static Set> getLongTimeColumns(TableIf table) { + return table.getColumnIndexPairs(table.getSchemaAllIndexes(false).stream() + .filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) + .map(Column::getName) + .collect(Collectors.toSet())) + .stream() + .filter(p -> !StatisticsUtil.needAnalyzeColumn(table, p)) + .filter(p -> StatisticsUtil.isLongTimeColumn(table, p)) + .collect(Collectors.toSet()); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java index 898c3af0fdf0d2a..ea667f9fbf94157 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java @@ -27,9 +27,12 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; @@ -47,6 +50,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import mockit.Expectations; import mockit.Injectable; import mockit.Mock; @@ -63,6 +67,8 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; // CHECKSTYLE OFF @@ -666,4 +672,67 @@ public void invalidateLocalStats(long catalogId, long dbId, long tableId, Set 10); Assertions.assertTrue(count.get() < 20); } + + @Test + public void testAppendToLongTimeJobs() throws InterruptedException { + List schema = Lists.newArrayList(new Column()); + OlapTable table1 = new OlapTable(200, "table1", schema, KeysType.DUP_KEYS, null, null); + Database db = new Database(100, "db1"); + db.registerTable(table1); + InternalCatalog internalCatalog = Env.getCurrentEnv().getInternalCatalog(); + internalCatalog.unprotectCreateDb(db); + + AnalysisManager manager = new AnalysisManager(); + Assertions.assertTrue(manager.appendToLongTimeJobs(table1)); + BlockingQueue longTimeJobs = manager.getLongTimeJobQueue(); + Assertions.assertEquals(1, longTimeJobs.size()); + Assertions.assertFalse(manager.appendToLongTimeJobs(table1)); + longTimeJobs = manager.getLongTimeJobQueue(); + Assertions.assertEquals(1, longTimeJobs.size()); + + OlapTable table2 = new OlapTable(201, "table2", schema, KeysType.DUP_KEYS, null, null); + db.registerTable(table2); + Assertions.assertTrue(manager.appendToLongTimeJobs(table2)); + Assertions.assertEquals(2, longTimeJobs.size()); + } + + @Test + public void testAppendToLongTimeJobsBlockedByQueue() throws InterruptedException { + List schema = Lists.newArrayList(new Column()); + Database db = new Database(100, "db1"); + InternalCatalog internalCatalog = Env.getCurrentEnv().getInternalCatalog(); + internalCatalog.unprotectCreateDb(db); + AnalysisManager manager = new AnalysisManager(); + AtomicBoolean threadDone = new AtomicBoolean(false); + + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + for (int i = 1; i < 12; i++) { + OlapTable table = new OlapTable(i, "table" + i, schema, KeysType.DUP_KEYS, null, null); + db.registerTable(table); + try { + manager.appendToLongTimeJobs(table); + System.out.println("Append job " + i); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + threadDone.set(true); + } + }); + thread.start(); + for (int i = 0; i < 100; i++) { + Thread.sleep(100); + BlockingQueue longTimeJobQueue = manager.getLongTimeJobQueue(); + if (longTimeJobQueue.size() == AnalysisManager.LONG_TIME_JOB_QUEUE_LIMIT) { + break; + } + } + Assertions.assertEquals(AnalysisManager.LONG_TIME_JOB_QUEUE_LIMIT, manager.getLongTimeJobQueue().size()); + Assertions.assertFalse(threadDone.get()); + manager.getLongTimeJob(); + thread.join(); + Assertions.assertTrue(threadDone.get()); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/LongTimeJobAppenderTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/LongTimeJobAppenderTest.java new file mode 100644 index 000000000000000..57593ee4207128f --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/LongTimeJobAppenderTest.java @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.jdbc.JdbcExternalTable; +import org.apache.doris.statistics.util.StatisticsUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import mockit.Mock; +import mockit.MockUp; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Set; + +class LongTimeJobAppenderTest { + @Test + public void testSkipTable() { + // Test skip table is not OlapTable or HMSExternalTable + JdbcExternalTable jdbcTable = new JdbcExternalTable(1, "table", "db", null); + LongTimeJobAppender appender = new LongTimeJobAppender(); + Assertions.assertTrue(appender.skip(jdbcTable)); + + // Test skip HMSExternalTable which is not hive. + HMSExternalTable hmsTable = new HMSExternalTable(1, "table", "db", null); + new MockUp() { + @Mock + public HMSExternalTable.DLAType getDlaType() { + return HMSExternalTable.DLAType.HUDI; + } + }; + Assertions.assertTrue(appender.skip(hmsTable)); + + // Test table column count is less than max width. + int maxWidth = StatisticsUtil.getAutoAnalyzeTableWidthThreshold(); + List schema = Lists.newArrayList(); + for (int i = 0; i < maxWidth; i++) { + schema.add(new Column()); + } + OlapTable olapTable = new OlapTable(); + new MockUp() { + @Mock + public List getBaseSchema() { + return schema; + } + }; + Assertions.assertFalse(appender.skip(olapTable)); + + // Test table is too wide + schema.add(new Column()); + Assertions.assertTrue(appender.skip(olapTable)); + } + + @Test + public void testAppendTable() throws InterruptedException { + // Test skip table + new MockUp() { + @Mock + public boolean skip(TableIf table) { + return true; + } + }; + LongTimeJobAppender appender = new LongTimeJobAppender(); + OlapTable olapTable = new OlapTable(); + Assertions.assertFalse(appender.appendTable(null, olapTable)); + new MockUp() { + @Mock + public boolean skip(TableIf table) { + return false; + } + }; + + // Test table stats meta is null + new MockUp() { + @Mock + public TableStatsMeta findTableStatsStatus(long tblId) { + return null; + } + }; + AnalysisManager manager = new AnalysisManager(); + Assertions.assertFalse(appender.appendTable(manager, olapTable)); + + // Test analyzed columns are empty. + TableStatsMeta tableMeta = new TableStatsMeta(); + new MockUp() { + @Mock + public TableStatsMeta findTableStatsStatus(long tblId) { + return tableMeta; + } + }; + new MockUp() { + @Mock + public Set> analyzeColumns() { + return null; + } + }; + Assertions.assertFalse(appender.appendTable(manager, olapTable)); + new MockUp() { + @Mock + public Set> analyzeColumns() { + return Sets.newHashSet(); + } + }; + Assertions.assertFalse(appender.appendTable(manager, olapTable)); + + // Test column analyzed within interval. + new MockUp() { + @Mock + public Set> analyzeColumns() { + return Sets.newHashSet(Pair.of("index1", "col1")); + } + + @Mock + public ColStatsMeta findColumnStatsMeta(String indexName, String colName) { + return new ColStatsMeta(System.currentTimeMillis(), null, null, null, 0, 100, 0, null); + } + }; + Config.auto_analyze_interval_seconds = 86400; + Assertions.assertFalse(appender.appendTable(manager, olapTable)); + + // Test column not analyzed within interval. + new MockUp() { + @Mock + public Set> analyzeColumns() { + return Sets.newHashSet(Pair.of("index1", "col1")); + } + + @Mock + public ColStatsMeta findColumnStatsMeta(String indexName, String colName) { + return new ColStatsMeta(0, null, null, null, 0, 100, 0, null); + } + }; + new MockUp() { + @Mock + public boolean appendToLongTimeJobs(TableIf table) throws InterruptedException { + return true; + } + }; + + Config.auto_analyze_interval_seconds = 86400; + Assertions.assertTrue(appender.appendTable(manager, olapTable)); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java index 32521882939d348..b486fc33df127b8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java @@ -208,12 +208,6 @@ public TableStatsMeta findTableStatsStatus(long tblId) { Assertions.assertTrue(StatisticsUtil.needAnalyzeColumn(table, Pair.of("index", column.getName()))); // Test external table auto analyze enabled. - new MockUp() { - @Mock - public TableStatsMeta findTableStatsStatus(long tblId) { - return null; - } - }; externalCatalog.getCatalogProperty().addProperty(ExternalCatalog.ENABLE_AUTO_ANALYZE, "false"); HMSExternalTable hmsTable1 = new HMSExternalTable(1, "name", "dbName", externalCatalog); externalCatalog.setAutoAnalyzePolicy("dbName", "name", "enable"); @@ -238,23 +232,6 @@ public TableStatsMeta findTableStatsStatus(long tblId) { tableMeta.userInjected = false; Assertions.assertTrue(StatisticsUtil.needAnalyzeColumn(table, Pair.of("index", column.getName()))); - // Test column hasn't been analyzed for longer than 1 day. - new MockUp() { - @Mock - public ColStatsMeta findColumnStatsMeta(String indexName, String colName) { - return new ColStatsMeta(0, null, null, null, 0, 100, 0, null); - } - }; - new MockUp() { - @Mock - public long getRowCount() { - return 100; - } - }; - Config.auto_analyze_interval_seconds = 60 * 60 * 24; - Assertions.assertTrue(StatisticsUtil.needAnalyzeColumn(table, Pair.of("index", column.getName()))); - Config.auto_analyze_interval_seconds = 0; - new MockUp() { @Mock public ColStatsMeta findColumnStatsMeta(String indexName, String colName) { @@ -365,6 +342,90 @@ public ColStatsMeta findColumnStatsMeta(String indexName, String colName) { tableMeta.partitionChanged.set(false); tableMeta.updatedRows.set(100); Assertions.assertFalse(StatisticsUtil.needAnalyzeColumn(table, Pair.of("index", column.getName()))); + } + + @Test + void testLongTimeNoAnalyze() throws InterruptedException { + Column column = new Column("testColumn", PrimitiveType.INT); + List schema = new ArrayList<>(); + schema.add(column); + OlapTable table = new OlapTable(200, "testTable", schema, null, null, null); + + // Test column is null + Assertions.assertFalse(StatisticsUtil.isLongTimeColumn(table, null)); + + // Test table auto analyze is disabled. + new MockUp() { + @Mock + public boolean autoAnalyzeEnabled() { + return false; + } + }; + Assertions.assertFalse(StatisticsUtil.isLongTimeColumn(table, Pair.of("index", column.getName()))); + new MockUp() { + @Mock + public boolean autoAnalyzeEnabled() { + return true; + } + }; + // Test table stats meta is null. + new MockUp() { + @Mock + public TableStatsMeta findTableStatsStatus(long tblId) { + return null; + } + }; + Assertions.assertFalse(StatisticsUtil.isLongTimeColumn(table, Pair.of("index", column.getName()))); + + // Test column stats meta is null + TableStatsMeta tableMeta = new TableStatsMeta(); + new MockUp() { + @Mock + public TableStatsMeta findTableStatsStatus(long tblId) { + return tableMeta; + } + }; + new MockUp() { + @Mock + public ColStatsMeta findColumnStatsMeta(String indexName, String colName) { + return null; + } + }; + Assertions.assertFalse(StatisticsUtil.isLongTimeColumn(table, Pair.of("index", column.getName()))); + new MockUp() { + @Mock + public ColStatsMeta findColumnStatsMeta(String indexName, String colName) {return new ColStatsMeta(System.currentTimeMillis(), null, null, null, 0, 100, 0, null); + } + }; + + // Test table stats is user injected + tableMeta.userInjected = true; + Assertions.assertFalse(StatisticsUtil.isLongTimeColumn(table, Pair.of("index", column.getName()))); + tableMeta.userInjected = false; + + // Test Config.auto_analyze_interval_seconds == 0 + Config.auto_analyze_interval_seconds = 0; + Assertions.assertFalse(StatisticsUtil.isLongTimeColumn(table, Pair.of("index", column.getName()))); + + // Test column analyzed within the time interval + Config.auto_analyze_interval_seconds = 86400; + Assertions.assertFalse(StatisticsUtil.isLongTimeColumn(table, Pair.of("index", column.getName()))); + + // Test column hasn't analyzed for longer than time interval. + new MockUp() { + @Mock + public ColStatsMeta findColumnStatsMeta(String indexName, String colName) { + ColStatsMeta ret = new ColStatsMeta(System.currentTimeMillis(), null, null, null, 0, 100, 0, null); + try { + Thread.sleep(1500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return ret; + } + }; + Config.auto_analyze_interval_seconds = 1; + Assertions.assertTrue(StatisticsUtil.isLongTimeColumn(table, Pair.of("index", column.getName()))); } }