diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 359d193a5ba914c..ec09c8508057130 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2764,6 +2764,13 @@ public class Config extends ConfigBase { public static boolean enable_proxy_protocol = false; public static int profile_async_collect_expire_time_secs = 5; + @ConfField(mutable = true, description = { + "内表自动收集时间间隔,当某一列上次收集时间距离当前时间大于该值,则会触发一次新的收集,0表示不会触发。", + "Columns that have not been collected within the specified interval will trigger automatic analyze. " + + "0 means not trigger." + }) + public static long auto_analyze_interval_seconds = 86400; + //========================================================================== // begin of cloud config diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java index 9ccfd956ca5d844..f660d6eeb3c6b5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java @@ -62,6 +62,7 @@ public class ShowAnalyzeStmt extends ShowStmt { .add("schedule_type") .add("start_time") .add("end_time") + .add("priority") .build(); private long jobId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java index 18bb916b8bdfce3..36986dc9d4ea8ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java @@ -62,6 +62,7 @@ public class ShowColumnStatsStmt extends ShowStmt { .add("trigger") .add("query_times") .add("updated_time") + .add("last_analyze_version") .build(); private final TableName tableName; @@ -162,6 +163,7 @@ public ShowResultSet constructResultSet(List, ColumnSt row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.jobType)); row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.queriedTimes)); row.add(String.valueOf(p.second.updatedTime)); + row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.tableVersion)); result.add(row); }); return new ShowResultSet(getMetaData(), result); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java index 91b8bf1de2db048..915b5f19e033bb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java @@ -57,6 +57,7 @@ public class ShowTableStatsStmt extends ShowStmt { .add("trigger") .add("new_partition") .add("user_inject") + .add("last_analyze_time") .build(); private static final ImmutableList INDEX_TITLE_NAMES = @@ -192,6 +193,7 @@ public ShowResultSet constructTableResultSet(TableStatsMeta tableStatistic, Tabl row.add(""); row.add(""); row.add(""); + row.add(""); result.add(row); return new ShowResultSet(getMetaData(), result); } @@ -201,15 +203,18 @@ public ShowResultSet constructTableResultSet(TableStatsMeta tableStatistic, Tabl row.add(String.valueOf(tableStatistic.updatedRows)); row.add(String.valueOf(tableStatistic.queriedTimes.get())); row.add(String.valueOf(tableStatistic.rowCount)); - LocalDateTime dateTime = + LocalDateTime tableUpdateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.updatedTime), java.time.ZoneId.systemDefault()); - String formattedDateTime = dateTime.format(formatter); - row.add(formattedDateTime); + LocalDateTime lastAnalyzeTime = + LocalDateTime.ofInstant(Instant.ofEpochMilli(tableStatistic.lastAnalyzeTime), + java.time.ZoneId.systemDefault()); + row.add(tableUpdateTime.format(formatter)); row.add(tableStatistic.analyzeColumns().toString()); row.add(tableStatistic.jobType.toString()); row.add(String.valueOf(tableStatistic.newPartitionLoaded.get())); row.add(String.valueOf(tableStatistic.userInjected)); + row.add(lastAnalyzeTime.format(formatter)); result.add(row); return new ShowResultSet(getMetaData(), result); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index cf0885e6ec29ea6..321d2f53fc7fd15 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -258,6 +258,7 @@ import org.apache.doris.statistics.StatisticsAutoCollector; import org.apache.doris.statistics.StatisticsCache; import org.apache.doris.statistics.StatisticsCleaner; +import org.apache.doris.statistics.StatisticsJobAppender; import org.apache.doris.statistics.query.QueryStats; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; @@ -540,6 +541,7 @@ public class Env { private final LoadManagerAdapter loadManagerAdapter; private StatisticsAutoCollector statisticsAutoCollector; + private StatisticsJobAppender statisticsJobAppender; private HiveTransactionMgr hiveTransactionMgr; @@ -780,6 +782,7 @@ public Env(boolean isCheckpointCatalog) { this.analysisManager = new AnalysisManager(); this.statisticsCleaner = new StatisticsCleaner(); this.statisticsAutoCollector = new StatisticsAutoCollector(); + this.statisticsJobAppender = new StatisticsJobAppender("StatisticsJobAppender"); this.globalFunctionMgr = new GlobalFunctionMgr(); this.workloadGroupMgr = new WorkloadGroupMgr(); this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr(); @@ -1078,12 +1081,6 @@ public void initialize(String[] args) throws Exception { // If not using bdb, we need to notify the FE type transfer manually. notifyNewFETypeTransfer(FrontendNodeType.MASTER); } - if (statisticsCleaner != null) { - statisticsCleaner.start(); - } - if (statisticsAutoCollector != null) { - statisticsAutoCollector.start(); - } queryCancelWorker.start(); } @@ -1623,6 +1620,15 @@ private void transferToMaster() { if (analysisManager != null) { analysisManager.getStatisticsCache().preHeat(); } + if (statisticsCleaner != null) { + statisticsCleaner.start(); + } + if (statisticsAutoCollector != null) { + statisticsAutoCollector.start(); + } + if (statisticsJobAppender != null) { + statisticsJobAppender.start(); + } } catch (Throwable e) { // When failed to transfer to master, we need to exit the process. // Otherwise, the process will be in an unknown state. @@ -6327,6 +6333,10 @@ public StatisticsAutoCollector getStatisticsAutoCollector() { return statisticsAutoCollector; } + public StatisticsJobAppender getStatisticsJobAppender() { + return statisticsJobAppender; + } + public NereidsSqlCacheManager getSqlCacheManager() { return sqlCacheManager; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 1d7d87d27f817fb..716f2bdca445d19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -350,7 +350,7 @@ public boolean needReAnalyzeTable(TableStatsMeta tblStats) { return true; } return System.currentTimeMillis() - - tblStats.updatedTime > StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis(); + - tblStats.lastAnalyzeTime > StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 741f20dd88fb5ec..fc92efd7f1c8e6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -2799,6 +2799,7 @@ private void handleShowAnalyze() { java.time.ZoneId.systemDefault()); row.add(startTime.format(formatter)); row.add(endTime.format(formatter)); + row.add(analysisInfo.priority == null ? "N/A" : analysisInfo.priority.name()); resultRows.add(row); } catch (Exception e) { LOG.warn("Failed to get analyze info for table {}.{}.{}, reason: {}", diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 0b8d8c229cdfc1f..f314f3aa76ce14d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -2513,7 +2513,7 @@ private void handleUnsupportedStmt() { context.getState().setOk(); } - private void handleAnalyzeStmt() throws DdlException, AnalysisException { + private void handleAnalyzeStmt() throws DdlException, AnalysisException, ExecutionException, InterruptedException { context.env.getAnalysisManager().createAnalyze((AnalyzeStmt) parsedStmt, isProxy); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java index 125b23bce7babb9..463b53bf64577e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java @@ -195,6 +195,13 @@ public enum ScheduleType { @SerializedName("rowCount") public final long rowCount; + + @SerializedName("priority") + public final JobPriority priority; + + @SerializedName("tv") + public final long tableVersion; + /** * * Used to store the newest partition version of tbl when creating this job. @@ -214,7 +221,7 @@ public AnalysisInfo(long jobId, long taskId, List taskIds, long catalogId, boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition, boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull, boolean usingSqlForPartitionColumn, long tblUpdateTime, boolean emptyJob, boolean userInject, - long rowCount) { + long rowCount, JobPriority priority, long tableVersion) { this.jobId = jobId; this.taskId = taskId; this.taskIds = taskIds; @@ -253,6 +260,8 @@ public AnalysisInfo(long jobId, long taskId, List taskIds, long catalogId, this.emptyJob = emptyJob; this.userInject = userInject; this.rowCount = rowCount; + this.priority = priority; + this.tableVersion = tableVersion; } @Override @@ -295,6 +304,8 @@ public String toString() { sj.add("forceFull: " + forceFull); sj.add("usingSqlForPartitionColumn: " + usingSqlForPartitionColumn); sj.add("emptyJob: " + emptyJob); + sj.add("priority: " + priority.name()); + sj.add("tableVersion: " + tableVersion); return sj.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java index 2e7c4078ca15fe9..2dd79030220bbeb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java @@ -65,6 +65,8 @@ public class AnalysisInfoBuilder { private boolean emptyJob; private boolean userInject = false; private long rowCount; + private JobPriority priority; + private long tableVersion; public AnalysisInfoBuilder() { } @@ -105,6 +107,8 @@ public AnalysisInfoBuilder(AnalysisInfo info) { emptyJob = info.emptyJob; userInject = info.userInject; rowCount = info.rowCount; + priority = info.priority; + tableVersion = info.tableVersion; } public AnalysisInfoBuilder setJobId(long jobId) { @@ -282,12 +286,23 @@ public AnalysisInfoBuilder setRowCount(long rowCount) { return this; } + public AnalysisInfoBuilder setPriority(JobPriority priority) { + this.priority = priority; + return this; + } + + public AnalysisInfoBuilder setTableVersion(long tableVersion) { + this.tableVersion = tableVersion; + return this; + } + public AnalysisInfo build() { return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId, tblId, jobColumns, partitionNames, colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent, sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType, externalTableLevelTask, partitionOnly, samplingPartition, isAllPartition, partitionCount, - cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, emptyJob, userInject, rowCount); + cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, emptyJob, userInject, rowCount, + priority, tableVersion); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 5f6206e854e8017..0e4a1c7b42d833d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -99,6 +99,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -144,7 +145,8 @@ public StatisticsCache getStatisticsCache() { return statisticsCache; } - public void createAnalyze(AnalyzeStmt analyzeStmt, boolean proxy) throws DdlException, AnalysisException { + public void createAnalyze(AnalyzeStmt analyzeStmt, boolean proxy) + throws DdlException, AnalysisException, ExecutionException, InterruptedException { if (!StatisticsUtil.statsTblAvailable() && !FeConstants.runningUnitTest) { throw new DdlException("Stats table not available, please make sure your cluster status is normal"); } @@ -157,10 +159,8 @@ public void createAnalyze(AnalyzeStmt analyzeStmt, boolean proxy) throws DdlExce public void createAnalysisJobs(AnalyzeDBStmt analyzeDBStmt, boolean proxy) throws DdlException, AnalysisException { DatabaseIf db = analyzeDBStmt.getDb(); - // Using auto analyzer if user specifies. if (analyzeDBStmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer")) { - Env.getCurrentEnv().getStatisticsAutoCollector().analyzeDb(db); - return; + throw new DdlException("Analyze database doesn't support use.auto.analyzer property."); } List analysisInfos = buildAnalysisInfosForDB(db, analyzeDBStmt.getAnalyzeProperties()); if (!analyzeDBStmt.isSync()) { @@ -208,22 +208,12 @@ public List buildAnalysisInfosForDB(DatabaseIf db, Analyz } // Each analyze stmt corresponding to an analysis job. - public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) throws DdlException, AnalysisException { + public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) + throws DdlException, AnalysisException, ExecutionException, InterruptedException { // Using auto analyzer if user specifies. if (stmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer")) { StatisticsAutoCollector autoCollector = Env.getCurrentEnv().getStatisticsAutoCollector(); - if (autoCollector.skip(stmt.getTable())) { - return; - } - List jobs = new ArrayList<>(); - autoCollector.createAnalyzeJobForTbl(stmt.getDb(), jobs, stmt.getTable()); - if (jobs.isEmpty()) { - return; - } - AnalysisInfo job = autoCollector.getNeedAnalyzeColumns(jobs.get(0)); - if (job != null) { - Env.getCurrentEnv().getStatisticsAutoCollector().createSystemAnalysisJob(job); - } + autoCollector.processOneJob(stmt.getTable(), JobPriority.MANUAL_AUTO); return; } AnalysisInfo jobInfo = buildAndAssignJob(stmt); @@ -347,7 +337,6 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExceptio infoBuilder.setAnalysisMode(analysisMode); infoBuilder.setAnalysisMethod(analysisMethod); infoBuilder.setScheduleType(scheduleType); - infoBuilder.setLastExecTimeInMs(0); infoBuilder.setCronExpression(cronExpression); infoBuilder.setForceFull(stmt.forceFull()); infoBuilder.setUsingSqlForPartitionColumn(stmt.usingSqlForPartitionColumn()); @@ -377,6 +366,8 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExceptio && analysisMethod.equals(AnalysisMethod.SAMPLE)); long rowCount = StatisticsUtil.isEmptyTable(table, analysisMethod) ? 0 : table.getRowCount(); infoBuilder.setRowCount(rowCount); + infoBuilder.setPriority(JobPriority.MANUAL); + infoBuilder.setTableVersion(table instanceof OlapTable ? ((OlapTable) table).getVisibleVersion() : 0); return infoBuilder.build(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java index 5277d8025fc7709..3db9a862d100f43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java @@ -27,6 +27,7 @@ import java.util.Comparator; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -88,9 +89,9 @@ protected void tryToCancel() { } } - public void submitTask(BaseAnalysisTask task) { + public Future submitTask(BaseAnalysisTask task) { AnalysisTaskWrapper taskWrapper = new AnalysisTaskWrapper(this, task); - executors.submit(taskWrapper); + return executors.submit(taskWrapper); } public void putJob(AnalysisTaskWrapper wrapper) throws Exception { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java index 445641b25056106..f9da52b01cc3abc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsMeta.java @@ -43,13 +43,17 @@ public class ColStatsMeta { @SerializedName("trigger") public JobType jobType; + @SerializedName("tv") + public long tableVersion; + public ColStatsMeta(long updatedTime, AnalysisMethod analysisMethod, - AnalysisType analysisType, JobType jobType, long queriedTimes) { + AnalysisType analysisType, JobType jobType, long queriedTimes, long tableVersion) { this.updatedTime = updatedTime; this.analysisMethod = analysisMethod; this.analysisType = analysisType; this.jobType = jobType; this.queriedTimes.addAndGet(queriedTimes); + this.tableVersion = tableVersion; } public void clear() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java new file mode 100644 index 000000000000000..2d45dad877bea03 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +public enum JobPriority { + HIGH, + LOW, + MANUAL, + MANUAL_AUTO; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index c1a8af93ac032a1..1d7818a8e8d6af1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -18,172 +18,125 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; -import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.AnalysisInfo.ScheduleType; import org.apache.doris.statistics.util.StatisticsUtil; -import org.apache.hudi.common.util.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.time.LocalTime; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.StringJoiner; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -public class StatisticsAutoCollector extends StatisticsCollector { +public class StatisticsAutoCollector extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(StatisticsAutoCollector.class); + public static final int JOB_QUEUE_LIMIT = 100; + private final BlockingQueue highPriorityJobs = new ArrayBlockingQueue<>(JOB_QUEUE_LIMIT); + private final BlockingQueue lowPriorityJobs = new ArrayBlockingQueue<>(JOB_QUEUE_LIMIT); + + protected final AnalysisTaskExecutor analysisTaskExecutor; + public StatisticsAutoCollector() { - super("Automatic Analyzer", - TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes), - new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num, - StatisticConstants.TASK_QUEUE_CAP)); + super("Automatic Analyzer", TimeUnit.SECONDS.toMillis(10)); + this.analysisTaskExecutor = new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num, + StatisticConstants.TASK_QUEUE_CAP); } @Override - protected void collect() { - if (canCollect()) { - analyzeAll(); + protected void runAfterCatalogReady() { + if (!Env.getCurrentEnv().isMaster()) { + return; + } + if (!StatisticsUtil.statsTblAvailable()) { + LOG.info("Stats table not available, skip"); + return; + } + if (Env.isCheckpointThread()) { + return; + } + try { + collect(); + } catch (DdlException | ExecutionException | InterruptedException e) { + LOG.warn("One auto analyze job failed. ", e); } } - protected boolean canCollect() { - return StatisticsUtil.enableAutoAnalyze() - && StatisticsUtil.inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId())); - } - - protected void analyzeAll() { - List catalogs = getCatalogsInOrder(); - for (CatalogIf ctl : catalogs) { - if (!canCollect()) { - analysisTaskExecutor.clear(); - break; - } - if (!ctl.enableAutoAnalyze()) { - continue; + protected void collect() throws DdlException, ExecutionException, InterruptedException { + if (!StatisticsUtil.canCollect()) { + return; + } + AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); + while (true) { + Pair jobPair = fetchOneJob(); + TableIf table = jobPair.first; + if (table == null) { + return; } - List dbs = getDatabasesInOrder(ctl); - for (DatabaseIf databaseIf : dbs) { - if (!canCollect()) { - analysisTaskExecutor.clear(); - break; - } - if (StatisticConstants.SYSTEM_DBS.contains(databaseIf.getFullName())) { - continue; - } - try { - analyzeDb(databaseIf); - } catch (Throwable t) { - LOG.warn("Failed to analyze database {}.{}", ctl.getName(), databaseIf.getFullName(), t); - continue; - } + TableStatsMeta tblStats = analysisManager.findTableStatsStatus(table.getId()); + if (table.needReAnalyzeTable(tblStats) || StatisticsUtil.tableNotAnalyzedForTooLong(table, tblStats)) { + processOneJob(table, jobPair.second); } } } - public List getCatalogsInOrder() { - return Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog().stream() - .sorted((c1, c2) -> (int) (c1.getId() - c2.getId())).collect(Collectors.toList()); - } - - public List> getDatabasesInOrder(CatalogIf catalog) { - return catalog.getAllDbs().stream() - .sorted((d1, d2) -> (int) (d1.getId() - d2.getId())).collect(Collectors.toList()); - } - - public List getTablesInOrder(DatabaseIf db) { - return db.getTables().stream() - .sorted((t1, t2) -> (int) (t1.getId() - t2.getId())).collect(Collectors.toList()); - } - - public void analyzeDb(DatabaseIf databaseIf) throws DdlException { - List analysisInfos = constructAnalysisInfo(databaseIf); - for (AnalysisInfo analysisInfo : analysisInfos) { - try { - if (!canCollect()) { - analysisTaskExecutor.clear(); - break; - } - analysisInfo = getNeedAnalyzeColumns(analysisInfo); - if (analysisInfo == null) { - continue; - } - createSystemAnalysisJob(analysisInfo); - } catch (Throwable t) { - analysisInfo.message = t.getMessage(); - LOG.warn("Failed to auto analyze table {}.{}, reason {}", - databaseIf.getFullName(), analysisInfo.tblId, analysisInfo.message, t); - continue; - } + protected Pair fetchOneJob() { + TableIf table = null; + JobPriority priority = null; + try { + table = highPriorityJobs.poll(1, TimeUnit.SECONDS); + priority = JobPriority.HIGH; + } catch (InterruptedException e) { + LOG.debug(e); } - } - - protected List constructAnalysisInfo(DatabaseIf db) { - List analysisInfos = new ArrayList<>(); - for (TableIf table : getTablesInOrder(db)) { + if (table == null) { try { - if (skip(table)) { - continue; - } - createAnalyzeJobForTbl(db, analysisInfos, table); - } catch (Throwable t) { - LOG.warn("Failed to analyze table {}.{}.{}", - db.getCatalog().getName(), db.getFullName(), table.getName(), t); + table = lowPriorityJobs.poll(1, TimeUnit.SECONDS); + priority = JobPriority.LOW; + } catch (InterruptedException e) { + LOG.debug(e); } } - return analysisInfos; - } - - // return true if skip auto analyze this time. - protected boolean skip(TableIf table) { - if (!(table instanceof OlapTable || table instanceof HMSExternalTable)) { - return true; - } - // For now, only support Hive HMS table auto collection. - if (table instanceof HMSExternalTable - && !((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) { - return true; - } - if (table.getDataSize(true) < StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5) { - return false; - } - TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId()); - // means it's never got analyzed or new partition loaded data. - if (tableStats == null || tableStats.newPartitionLoaded.get()) { - return false; + if (table == null) { + LOG.debug("Job queues are all empty."); } - if (tableStats.userInjected) { - return true; - } - return System.currentTimeMillis() - - tableStats.updatedTime < StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis(); + return Pair.of(table, priority); } - protected void createAnalyzeJobForTbl(DatabaseIf db, - List analysisInfos, TableIf table) { - AnalysisMethod analysisMethod = table.getDataSize(true) >= StatisticsUtil.getHugeTableLowerBoundSizeInBytes() - ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL; - if (table instanceof OlapTable && analysisMethod.equals(AnalysisMethod.SAMPLE)) { - OlapTable ot = (OlapTable) table; - if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == TableIf.UNKNOWN_ROW_COUNT) { - LOG.info("Table {} row count is not fully reported, skip auto analyzing this time.", ot.getName()); - return; - } + protected void processOneJob(TableIf table, JobPriority priority) + throws DdlException, ExecutionException, InterruptedException { + List> needRunColumns = table.getColumnIndexPairs( + table.getSchemaAllIndexes(false) + .stream() + .filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) + .map(Column::getName) + .collect(Collectors.toSet())); + if (needRunColumns == null || needRunColumns.isEmpty()) { + return; + } + AnalysisMethod analysisMethod = + table.getDataSize(true) >= StatisticsUtil.getHugeTableLowerBoundSizeInBytes() + ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL; + if (!tableRowCountReported(table, analysisMethod)) { + return; } // We don't auto analyze empty table to avoid all 0 stats. // Because all 0 is more dangerous than unknown stats when row count report is delayed. @@ -198,12 +151,33 @@ protected void createAnalyzeJobForTbl(DatabaseIf db, } return; } + StringJoiner stringJoiner = new StringJoiner(",", "[", "]"); + for (Pair pair : needRunColumns) { + stringJoiner.add(pair.toString()); + } + AnalysisInfo jobInfo = createAnalysisInfo(table, analysisMethod, rowCount, + stringJoiner.toString(), needRunColumns, priority); + executeSystemAnalysisJob(jobInfo); + } + + protected boolean tableRowCountReported(TableIf table, AnalysisMethod analysisMethod) { + if (table instanceof OlapTable && analysisMethod.equals(AnalysisMethod.SAMPLE)) { + OlapTable ot = (OlapTable) table; + if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == TableIf.UNKNOWN_ROW_COUNT) { + LOG.info("Table {} row count is not fully reported, skip auto analyzing this time.", ot.getName()); + return false; + } + } + return true; + } + + protected AnalysisInfo createAnalysisInfo(TableIf table, AnalysisMethod analysisMethod, long rowCount, + String colNames, List> needRunColumns, JobPriority priority) { AnalysisInfo jobInfo = new AnalysisInfoBuilder() .setJobId(Env.getCurrentEnv().getNextId()) - .setCatalogId(db.getCatalog().getId()) - .setDBId(db.getId()) + .setCatalogId(table.getDatabase().getCatalog().getId()) + .setDBId(table.getDatabase().getId()) .setTblId(table.getId()) - .setColName(null) .setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS) .setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL) .setAnalysisMethod(analysisMethod) @@ -216,37 +190,46 @@ protected void createAnalyzeJobForTbl(DatabaseIf db, .setJobType(JobType.SYSTEM) .setTblUpdateTime(table.getUpdateTime()) .setEmptyJob(table instanceof OlapTable && table.getRowCount() == 0 - && analysisMethod.equals(AnalysisMethod.SAMPLE)) + && analysisMethod.equals(AnalysisMethod.SAMPLE)) .setRowCount(rowCount) + .setColName(colNames) + .setJobColumns(needRunColumns) + .setPriority(priority) + .setTableVersion(table instanceof OlapTable ? ((OlapTable) table).getVisibleVersion() : 0) .build(); - analysisInfos.add(jobInfo); + return jobInfo; } - @VisibleForTesting - protected AnalysisInfo getNeedAnalyzeColumns(AnalysisInfo jobInfo) { - TableIf table = StatisticsUtil.findTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId); - // Skip tables that are too wide. - if (table.getBaseSchema().size() > StatisticsUtil.getAutoAnalyzeTableWidthThreshold()) { - return null; + // Analysis job created by the system + protected void executeSystemAnalysisJob(AnalysisInfo jobInfo) + throws DdlException, ExecutionException, InterruptedException { + if (jobInfo.jobColumns.isEmpty()) { + // No statistics need to be collected or updated + return; } - - AnalysisManager analysisManager = Env.getServingEnv().getAnalysisManager(); - TableStatsMeta tblStats = analysisManager.findTableStatsStatus(table.getId()); - - List> needRunColumns = null; - if (table.needReAnalyzeTable(tblStats)) { - needRunColumns = table.getColumnIndexPairs(table.getSchemaAllIndexes(false) - .stream().map(Column::getName).collect(Collectors.toSet())); + Map analysisTasks = new HashMap<>(); + AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); + analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false); + if (StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId)) { + analysisManager.createTableLevelTaskForExternalTable(jobInfo, analysisTasks, false); } - - if (needRunColumns == null || needRunColumns.isEmpty()) { - return null; + Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, analysisTasks.values()); + Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTasks); + Future[] futures = new Future[analysisTasks.values().size()]; + int i = 0; + for (BaseAnalysisTask task : analysisTasks.values()) { + futures[i++] = analysisTaskExecutor.submitTask(task); } - StringJoiner stringJoiner = new StringJoiner(",", "[", "]"); - for (Pair pair : needRunColumns) { - stringJoiner.add(pair.toString()); + for (Future future : futures) { + future.get(); } - return new AnalysisInfoBuilder(jobInfo) - .setColName(stringJoiner.toString()).setJobColumns(needRunColumns).build(); + } + + public void appendToHighPriorityJobs(TableIf table) throws InterruptedException { + highPriorityJobs.put(table); + } + + public boolean appendToLowPriorityJobs(TableIf table) { + return lowPriorityJobs.offer(table); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java deleted file mode 100644 index ec187fe893af49a..000000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java +++ /dev/null @@ -1,79 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.statistics; - -import org.apache.doris.catalog.Env; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.util.MasterDaemon; -import org.apache.doris.statistics.util.StatisticsUtil; - -import org.apache.hudi.common.util.VisibleForTesting; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.HashMap; -import java.util.Map; - -public abstract class StatisticsCollector extends MasterDaemon { - - private static final Logger LOG = LogManager.getLogger(StatisticsCollector.class); - - protected final AnalysisTaskExecutor analysisTaskExecutor; - - public StatisticsCollector(String name, long intervalMs, AnalysisTaskExecutor analysisTaskExecutor) { - super(name, intervalMs); - this.analysisTaskExecutor = analysisTaskExecutor; - } - - @Override - protected void runAfterCatalogReady() { - if (!Env.getCurrentEnv().isMaster()) { - return; - } - if (!StatisticsUtil.statsTblAvailable()) { - LOG.info("Stats table not available, skip"); - return; - } - if (Env.isCheckpointThread()) { - return; - } - collect(); - } - - protected abstract void collect(); - - // Analysis job created by the system - @VisibleForTesting - protected void createSystemAnalysisJob(AnalysisInfo jobInfo) - throws DdlException { - if (jobInfo.jobColumns.isEmpty()) { - // No statistics need to be collected or updated - return; - } - Map analysisTasks = new HashMap<>(); - AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); - analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false); - if (StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId)) { - analysisManager.createTableLevelTaskForExternalTable(jobInfo, analysisTasks, false); - } - Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, analysisTasks.values()); - Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTasks); - analysisTasks.values().forEach(analysisTaskExecutor::submitTask); - } - -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java new file mode 100644 index 000000000000000..3703cf236c03dae --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.Config; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.statistics.util.StatisticsUtil; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class StatisticsJobAppender extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(StatisticsJobAppender.class); + + public StatisticsJobAppender(String name) { + super(name, TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes)); + } + + @Override + protected void runAfterCatalogReady() { + if (!Env.getCurrentEnv().isMaster()) { + return; + } + if (!StatisticsUtil.statsTblAvailable()) { + LOG.info("Stats table not available, skip"); + return; + } + if (Env.getCurrentEnv().getStatisticsAutoCollector() == null) { + LOG.info("Statistics auto collector not ready, skip"); + return; + } + if (Env.isCheckpointThread()) { + return; + } + if (!Env.getCurrentEnv().isReady()) { + return; + } + if (!StatisticsUtil.canCollect()) { + LOG.debug("Auto analyze not enabled or not in analyze time range."); + return; + } + traverseAllTables(); + } + + protected void traverseAllTables() { + List catalogs = getCatalogsInOrder(); + AnalysisManager analysisManager = Env.getServingEnv().getAnalysisManager(); + StatisticsAutoCollector autoCollector = Env.getCurrentEnv().getStatisticsAutoCollector(); + for (CatalogIf ctl : catalogs) { + if (!StatisticsUtil.canCollect()) { + break; + } + if (!ctl.enableAutoAnalyze()) { + continue; + } + List dbs = getDatabasesInOrder(ctl); + for (DatabaseIf db : dbs) { + if (!StatisticsUtil.canCollect()) { + break; + } + if (StatisticConstants.SYSTEM_DBS.contains(db.getFullName())) { + continue; + } + for (TableIf table : getTablesInOrder(db)) { + try { + if (skip(table)) { + continue; + } + TableStatsMeta tblStats = analysisManager.findTableStatsStatus(table.getId()); + if (table.needReAnalyzeTable(tblStats)) { + autoCollector.appendToHighPriorityJobs(table); + } else if (StatisticsUtil.tableNotAnalyzedForTooLong(table, tblStats)) { + autoCollector.appendToLowPriorityJobs(table); + } + } catch (Throwable t) { + LOG.warn("Failed to analyze table {}.{}.{}", + ctl.getName(), db.getFullName(), table.getName(), t); + } + } + } + } + } + + public List getCatalogsInOrder() { + return Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog().stream() + .sorted((c1, c2) -> (int) (c1.getId() - c2.getId())).collect(Collectors.toList()); + } + + public List> getDatabasesInOrder(CatalogIf catalog) { + return catalog.getAllDbs().stream() + .sorted((d1, d2) -> (int) (d1.getId() - d2.getId())).collect(Collectors.toList()); + } + + public List getTablesInOrder(DatabaseIf db) { + return db.getTables().stream() + .sorted((t1, t2) -> (int) (t1.getId() - t2.getId())).collect(Collectors.toList()); + } + + // return true if skip auto analyze this time. + protected boolean skip(TableIf table) { + if (!(table instanceof OlapTable || table instanceof HMSExternalTable)) { + return true; + } + // For now, only support Hive HMS table auto collection. + if (table instanceof HMSExternalTable + && !((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) { + return true; + } + // Skip wide table. + if (table.getBaseSchema().size() > StatisticsUtil.getAutoAnalyzeTableWidthThreshold()) { + return true; + } + if (table.getDataSize(true) < StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5) { + return false; + } + TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId()); + // means it's never got analyzed or new partition loaded data. + if (tableStats == null || tableStats.newPartitionLoaded.get()) { + return false; + } + if (tableStats.userInjected) { + return true; + } + return System.currentTimeMillis() + - tableStats.lastAnalyzeTime < StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java index 50739e98aea3e55..33f099e2b0ba032 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java @@ -79,6 +79,9 @@ public class TableStatsMeta implements Writable, GsonPostProcessable { @SerializedName("updateTime") public long updatedTime; + @SerializedName("lat") + public long lastAnalyzeTime; + @SerializedName("colNameToColStatsMeta") private ConcurrentMap deprecatedColNameToColStatsMeta = new ConcurrentHashMap<>(); @@ -156,19 +159,21 @@ public Set> analyzeColumns() { public void update(AnalysisInfo analyzedJob, TableIf tableIf) { updatedTime = analyzedJob.tblUpdateTime; + lastAnalyzeTime = analyzedJob.createTime; if (analyzedJob.userInject) { userInjected = true; } for (Pair colPair : analyzedJob.jobColumns) { ColStatsMeta colStatsMeta = colToColStatsMeta.get(colPair); if (colStatsMeta == null) { - colToColStatsMeta.put(colPair, new ColStatsMeta(updatedTime, - analyzedJob.analysisMethod, analyzedJob.analysisType, analyzedJob.jobType, 0)); + colToColStatsMeta.put(colPair, new ColStatsMeta(lastAnalyzeTime, analyzedJob.analysisMethod, + analyzedJob.analysisType, analyzedJob.jobType, 0, analyzedJob.tableVersion)); } else { - colStatsMeta.updatedTime = updatedTime; + colStatsMeta.updatedTime = lastAnalyzeTime; colStatsMeta.analysisType = analyzedJob.analysisType; colStatsMeta.analysisMethod = analyzedJob.analysisMethod; colStatsMeta.jobType = analyzedJob.jobType; + colStatsMeta.tableVersion = analyzedJob.tableVersion; } } jobType = analyzedJob.jobType; @@ -233,4 +238,9 @@ protected void addIndexRowForTest(long indexId, long rowCount) { public boolean isColumnsStatsEmpty() { return colToColStatsMeta == null || colToColStatsMeta.isEmpty(); } + + @VisibleForTesting + public void setColToColStatsMeta(ConcurrentMap, ColStatsMeta> colToColStatsMeta) { + this.colToColStatsMeta = colToColStatsMeta; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 288eb88e95fc238..718260a25b00873 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -51,6 +51,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.InternalCatalog; @@ -70,6 +71,7 @@ import org.apache.doris.statistics.Histogram; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.StatisticConstants; +import org.apache.doris.statistics.TableStatsMeta; import org.apache.doris.system.Frontend; import com.google.common.base.Preconditions; @@ -904,4 +906,37 @@ public static boolean isEmptyTable(TableIf table, AnalysisInfo.AnalysisMethod me return rowCount == 0; } + public static boolean canCollect() { + return enableAutoAnalyze() && inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId())); + } + + public static boolean tableNotAnalyzedForTooLong(TableIf table, TableStatsMeta tblStats) { + if (table == null || tblStats == null) { + LOG.warn("Table or stats is null."); + return false; + } + if (tblStats.userInjected) { + return false; + } + if (!(table instanceof OlapTable)) { + return false; + } + boolean isLongTime = Config.auto_analyze_interval_seconds > 0 + && System.currentTimeMillis() - tblStats.lastAnalyzeTime > Config.auto_analyze_interval_seconds * 1000; + if (!isLongTime) { + return false; + } + // For OlapTable, if update rows is 0, row count doesn't change since last analyze + // and table visible version doesn't change since last analyze. Then we skip analyzing it. + if (tblStats.updatedRows.get() != 0) { + return true; + } + long rowCount = table.getRowCount(); + if (rowCount != tblStats.rowCount) { + return true; + } + long visibleVersion = ((OlapTable) table).getVisibleVersion(); + return tblStats.analyzeColumns().stream() + .anyMatch(c -> tblStats.findColumnStatsMeta(c.first, c.second).tableVersion != visibleVersion); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java index 4f40071f501e186..d687d111d2c103e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java @@ -17,504 +17,102 @@ package org.apache.doris.statistics; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.DatabaseIf; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.EnvFactory; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; -import org.apache.doris.catalog.Type; -import org.apache.doris.catalog.View; -import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; -import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.datasource.InternalCatalog; -import org.apache.doris.statistics.util.StatisticsUtil; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; -import com.google.common.collect.Lists; -import mockit.Expectations; -import mockit.Injectable; import mockit.Mock; import mockit.MockUp; -import mockit.Mocked; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.time.LocalTime; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ExecutionException; public class StatisticsAutoCollectorTest { @Test - public void testAnalyzeAll(@Injectable AnalysisInfo analysisInfo) { - new MockUp() { - @Mock - public Collection getAllDbs() { - Database db1 = new Database(1, FeConstants.INTERNAL_DB_NAME); - Database db2 = new Database(2, "anyDB"); - List databaseIfs = new ArrayList<>(); - databaseIfs.add(db1); - databaseIfs.add(db2); - return databaseIfs; - } - }; + public void testCollect() throws DdlException, ExecutionException, InterruptedException { + StatisticsAutoCollector collector = new StatisticsAutoCollector(); + final int[] count = {0, 0}; new MockUp() { @Mock - public List constructAnalysisInfo(DatabaseIf db) { - return Arrays.asList(analysisInfo, analysisInfo); - } - - int count = 0; - - @Mock - public AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) { - return count++ == 0 ? null : jobInfo; - } - - @Mock - public void createSystemAnalysisJob(AnalysisInfo jobInfo) - throws DdlException { - - } - }; - - StatisticsAutoCollector saa = new StatisticsAutoCollector(); - saa.runAfterCatalogReady(); - new Expectations() { - { - try { - saa.createSystemAnalysisJob((AnalysisInfo) any); - times = 1; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - } - - @Test - public void testConstructAnalysisInfo( - @Injectable OlapTable o2, @Injectable View v) { - new MockUp() { - @Mock - public List getTables() { - List
tableIfs = new ArrayList<>(); - tableIfs.add(o2); - tableIfs.add(v); - return tableIfs; - } - - @Mock - public String getFullName() { - return "anyDb"; - } - }; - - new MockUp() { - @Mock - public String getName() { - return "anytable"; - } - - @Mock - public List getSchemaAllIndexes(boolean full) { - List columns = new ArrayList<>(); - columns.add(new Column("c1", PrimitiveType.INT)); - columns.add(new Column("c2", PrimitiveType.HLL)); - return columns; - } - - @Mock - public long getRowCount() { - return 1; - } - }; - StatisticsAutoCollector saa = new StatisticsAutoCollector(); - List analysisInfoList = saa.constructAnalysisInfo(new Database(1, "anydb")); - Assertions.assertEquals(1, analysisInfoList.size()); - Assertions.assertNull(analysisInfoList.get(0).colName); - } - - @Test - public void testSkipWideTable() { - - TableIf tableIf = new OlapTable(); - - new MockUp() { - @Mock - public List getBaseSchema() { - return Lists.newArrayList(new Column("col1", Type.INT), new Column("col2", Type.INT)); - } - - @Mock - public List> getColumnIndexPairs(Set columns) { - ArrayList> list = Lists.newArrayList(); - list.add(Pair.of("1", "1")); - return list; + protected Pair fetchOneJob() { + count[0]++; + return Pair.of(null, JobPriority.LOW); } }; - new MockUp() { - int count = 0; - int[] thresholds = {1, 10}; - - @Mock - public TableIf findTable(long catalogName, long dbName, long tblName) { - return tableIf; - } - - @Mock - public int getAutoAnalyzeTableWidthThreshold() { - return thresholds[count++]; - } - }; - - AnalysisInfo analysisInfo = new AnalysisInfoBuilder().build(); - StatisticsAutoCollector statisticsAutoCollector = new StatisticsAutoCollector(); - Assertions.assertNull(statisticsAutoCollector.getNeedAnalyzeColumns(analysisInfo)); - Assertions.assertNotNull(statisticsAutoCollector.getNeedAnalyzeColumns(analysisInfo)); - } - - @Test - public void testLoop() { - AtomicBoolean timeChecked = new AtomicBoolean(); - AtomicBoolean switchChecked = new AtomicBoolean(); - new MockUp() { - - @Mock - public boolean inAnalyzeTime(LocalTime now) { - timeChecked.set(true); - return true; - } - - @Mock - public boolean enableAutoAnalyze() { - switchChecked.set(true); - return true; - } - }; - StatisticsAutoCollector autoCollector = new StatisticsAutoCollector(); - autoCollector.collect(); - Assertions.assertTrue(timeChecked.get() && switchChecked.get()); - - } - - @Test - public void checkAvailableThread() { - StatisticsAutoCollector autoCollector = new StatisticsAutoCollector(); - Assertions.assertEquals(Config.auto_analyze_simultaneously_running_task_num, - autoCollector.analysisTaskExecutor.executors.getMaximumPoolSize()); - } - - @Test - public void testSkip(@Mocked OlapTable olapTable, @Mocked TableStatsMeta stats, @Mocked TableIf anyOtherTable) { - new MockUp() { - - @Mock - public long getDataSize(boolean singleReplica) { - return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5 + 1000000000; - } - }; - - new MockUp() { - - @Mock - public TableStatsMeta findTableStatsStatus(long tblId) { - return stats; - } - }; - // A very huge table has been updated recently, so we should skip it this time - stats.updatedTime = System.currentTimeMillis() - 1000; - stats.newPartitionLoaded = new AtomicBoolean(); - stats.newPartitionLoaded.set(true); - StatisticsAutoCollector autoCollector = new StatisticsAutoCollector(); - // Test new partition loaded data for the first time. Not skip. - Assertions.assertFalse(autoCollector.skip(olapTable)); - stats.newPartitionLoaded.set(false); - // Assertions.assertTrue(autoCollector.skip(olapTable)); - // The update of this huge table is long time ago, so we shouldn't skip it this time - stats.updatedTime = System.currentTimeMillis() - - StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis() - 10000; - Assertions.assertFalse(autoCollector.skip(olapTable)); new MockUp() { - @Mock public TableStatsMeta findTableStatsStatus(long tblId) { + count[1]++; return null; } }; - // can't find table stats meta, which means this table never get analyzed, so we shouldn't skip it this time - Assertions.assertFalse(autoCollector.skip(olapTable)); - new MockUp() { - - @Mock - public TableStatsMeta findTableStatsStatus(long tblId) { - return stats; - } - }; - stats.userInjected = true; - Assertions.assertTrue(autoCollector.skip(olapTable)); - // this is not olap table nor external table, so we should skip it this time - Assertions.assertTrue(autoCollector.skip(anyOtherTable)); - } - - // For small table, use full - @Test - public void testCreateAnalyzeJobForTbl1( - @Injectable OlapTable t1, - @Injectable Database db - ) throws Exception { - new MockUp() { - - @Mock - public CatalogIf getCatalog() { - return Env.getCurrentInternalCatalog(); - } - - @Mock - public long getId() { - return 0; - } - }; - new MockUp() { - - int count = 0; - - @Mock - public List getBaseSchema() { - return Lists.newArrayList(new Column("test", PrimitiveType.INT)); - } - - @Mock - public long getDataSize(boolean singleReplica) { - return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() - 1; - } - - @Mock - public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { - return new OlapAnalysisTask(info); - } - - @Mock - public List getMvColumnIndexIds(String columnName) { - ArrayList objects = new ArrayList<>(); - objects.add(-1L); - return objects; - } - - @Mock - public long getRowCount() { - return 1; - } - }; - - new MockUp() { - @Mock - public TableIf findTable(long catalogId, long dbId, long tblId) { - return t1; - } - }; - - StatisticsAutoCollector sac = new StatisticsAutoCollector(); - List jobInfos = new ArrayList<>(); - sac.createAnalyzeJobForTbl(db, jobInfos, t1); - AnalysisInfo jobInfo = jobInfos.get(0); - List> columnNames = Lists.newArrayList(); - columnNames.add(Pair.of("test", "t1")); - jobInfo = new AnalysisInfoBuilder(jobInfo).setJobColumns(columnNames).build(); - Map analysisTasks = new HashMap<>(); - AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); - analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false); - Assertions.assertEquals(1, analysisTasks.size()); - for (BaseAnalysisTask task : analysisTasks.values()) { - Assertions.assertNull(task.getTableSample()); - } - } - - // for big table, use sample - @Test - public void testCreateAnalyzeJobForTbl2( - @Injectable OlapTable t1, - @Injectable Database db - ) throws Exception { - new MockUp() { - - @Mock - public CatalogIf getCatalog() { - return Env.getCurrentInternalCatalog(); - } - - @Mock - public long getId() { - return 0; - } - }; - new MockUp() { - - int count = 0; - - @Mock - public List getBaseSchema() { - return Lists.newArrayList(new Column("test", PrimitiveType.INT)); - } - - @Mock - public long getDataSize(boolean singleReplica) { - return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 2; - } - - @Mock - public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { - return new OlapAnalysisTask(info); - } - - @Mock - public List getMvColumnIndexIds(String columnName) { - ArrayList objects = new ArrayList<>(); - objects.add(-1L); - return objects; - } - - @Mock - public long getRowCount() { - return 1; - } - }; - - new MockUp() { - @Mock - public TableIf findTable(long catalogId, long dbId, long tblId) { - return t1; - } - }; - - StatisticsAutoCollector sac = new StatisticsAutoCollector(); - List jobInfos = new ArrayList<>(); - sac.createAnalyzeJobForTbl(db, jobInfos, t1); - AnalysisInfo jobInfo = jobInfos.get(0); - List> colNames = Lists.newArrayList(); - colNames.add(Pair.of("test", "1")); - jobInfo = new AnalysisInfoBuilder(jobInfo).setJobColumns(colNames).build(); - Map analysisTasks = new HashMap<>(); - AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); - analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false); - Assertions.assertEquals(1, analysisTasks.size()); - for (BaseAnalysisTask task : analysisTasks.values()) { - Assertions.assertNotNull(task.getTableSample()); - } - } - - @Test - public void testDisableAuto1() throws Exception { - InternalCatalog catalog1 = EnvFactory.createInternalCatalog(); - List catalogs = Lists.newArrayList(); - catalogs.add(catalog1); + collector.collect(); + Assertions.assertEquals(1, count[0]); + Assertions.assertEquals(0, count[1]); + OlapTable table = new OlapTable(); new MockUp() { @Mock - public List getCatalogsInOrder() { - return catalogs; - } - - @Mock - protected boolean canCollect() { - return false; + protected Pair fetchOneJob() { + if (count[0] == 0) { + count[0]++; + return Pair.of(table, JobPriority.LOW); + } + count[0]++; + return Pair.of(null, JobPriority.LOW); } - }; - - StatisticsAutoCollector sac = new StatisticsAutoCollector(); - new Expectations(catalog1) {{ - catalog1.enableAutoAnalyze(); - times = 0; - }}; - - sac.analyzeAll(); + count[0] = 0; + count[1] = 0; + collector.collect(); + Assertions.assertEquals(2, count[0]); + Assertions.assertEquals(1, count[1]); } @Test - public void testDisableAuto2() throws Exception { - InternalCatalog catalog1 = EnvFactory.createInternalCatalog(); - List catalogs = Lists.newArrayList(); - catalogs.add(catalog1); - - Database db1 = new Database(); - List> dbs = Lists.newArrayList(); - dbs.add(db1); - - new MockUp() { - int count = 0; - boolean[] canCollectReturn = {true, false}; - @Mock - public List getCatalogsInOrder() { - return catalogs; - } - - @Mock - public List> getDatabasesInOrder(CatalogIf catalog) { - return dbs; - } - - @Mock - protected boolean canCollect() { - return canCollectReturn[count++]; - } - - }; - - StatisticsAutoCollector sac = new StatisticsAutoCollector(); - new Expectations(catalog1, db1) {{ - catalog1.enableAutoAnalyze(); - result = true; - times = 1; - db1.getFullName(); - times = 0; - }}; - - sac.analyzeAll(); + public void testFetchOneJob() throws InterruptedException { + OlapTable table1 = new OlapTable(); + OlapTable table2 = new OlapTable(); + StatisticsAutoCollector collector = new StatisticsAutoCollector(); + collector.appendToHighPriorityJobs(table1); + collector.appendToLowPriorityJobs(table2); + Pair jobPair = collector.fetchOneJob(); + Assertions.assertSame(table1, jobPair.first); + Assertions.assertEquals(JobPriority.HIGH, jobPair.second); + jobPair = collector.fetchOneJob(); + Assertions.assertSame(table2, jobPair.first); + Assertions.assertEquals(JobPriority.LOW, jobPair.second); + jobPair = collector.fetchOneJob(); + Assertions.assertNull(jobPair.first); } @Test - public void testCreateAnalyzeJobForTbl() { + public void testTableRowCountReported() { StatisticsAutoCollector collector = new StatisticsAutoCollector(); - OlapTable table = new OlapTable(); + ExternalTable externalTable = new ExternalTable(); + Assertions.assertTrue(collector.tableRowCountReported(externalTable, AnalysisMethod.SAMPLE)); + OlapTable olapTable = new OlapTable(); + Assertions.assertTrue(collector.tableRowCountReported(olapTable, AnalysisMethod.FULL)); + Assertions.assertTrue(collector.tableRowCountReported(externalTable, AnalysisMethod.FULL)); new MockUp() { - @Mock - public long getDataSize(boolean singleReplica) { - return 100; - } - @Mock public long getRowCountForIndex(long indexId, boolean strict) { - return -1; - } - - @Mock - public boolean isPartitionedTable() { - return false; + return TableIf.UNKNOWN_ROW_COUNT; } }; - List infos = Lists.newArrayList(); - collector.createAnalyzeJobForTbl(null, infos, table); - Assertions.assertEquals(0, infos.size()); + Assertions.assertFalse(collector.tableRowCountReported(olapTable, AnalysisMethod.SAMPLE)); new MockUp() { @Mock public long getRowCountForIndex(long indexId, boolean strict) { - return 100; + return TableIf.UNKNOWN_ROW_COUNT + 1; } }; - Assertions.assertThrows(NullPointerException.class, () -> collector.createAnalyzeJobForTbl(null, infos, table)); + Assertions.assertTrue(collector.tableRowCountReported(olapTable, AnalysisMethod.SAMPLE)); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsJobAppenderTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsJobAppenderTest.java new file mode 100644 index 000000000000000..f57791a1a5c6bd2 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsJobAppenderTest.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.statistics.util.StatisticsUtil; + +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class StatisticsJobAppenderTest { + @Test + public void testSkip(@Mocked OlapTable olapTable, @Mocked TableStatsMeta stats, @Mocked TableIf anyOtherTable) { + new MockUp() { + + @Mock + public long getDataSize(boolean singleReplica) { + return StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5 + 1000000000; + } + }; + + new MockUp() { + + @Mock + public TableStatsMeta findTableStatsStatus(long tblId) { + return stats; + } + }; + // A very huge table has been updated recently, so we should skip it this time + stats.updatedTime = System.currentTimeMillis() - 1000; + stats.newPartitionLoaded = new AtomicBoolean(); + stats.newPartitionLoaded.set(true); + StatisticsJobAppender appender = new StatisticsJobAppender("appender"); + // Test new partition loaded data for the first time. Not skip. + Assertions.assertFalse(appender.skip(olapTable)); + stats.newPartitionLoaded.set(false); + // The update of this huge table is long time ago, so we shouldn't skip it this time + stats.updatedTime = System.currentTimeMillis() + - StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis() - 10000; + Assertions.assertFalse(appender.skip(olapTable)); + new MockUp() { + + @Mock + public TableStatsMeta findTableStatsStatus(long tblId) { + return null; + } + }; + // can't find table stats meta, which means this table never get analyzed, so we shouldn't skip it this time + Assertions.assertFalse(appender.skip(olapTable)); + new MockUp() { + + @Mock + public TableStatsMeta findTableStatsStatus(long tblId) { + return stats; + } + }; + stats.userInjected = true; + Assertions.assertTrue(appender.skip(olapTable)); + + // this is not olap table nor external table, so we should skip it this time + Assertions.assertTrue(appender.skip(anyOtherTable)); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java index 724e0363833305f..0e221673e9d2c23 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/util/StatisticsUtilTest.java @@ -17,10 +17,20 @@ package org.apache.doris.statistics.util; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; +import org.apache.doris.datasource.ExternalTable; import org.apache.doris.qe.SessionVariable; +import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; +import org.apache.doris.statistics.AnalysisInfo.AnalysisType; +import org.apache.doris.statistics.AnalysisInfo.JobType; +import org.apache.doris.statistics.ColStatsMeta; import org.apache.doris.statistics.ResultRow; +import org.apache.doris.statistics.TableStatsMeta; import com.google.common.collect.Lists; import mockit.Mock; @@ -33,6 +43,8 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Base64; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; class StatisticsUtilTest { @Test @@ -150,4 +162,77 @@ void testEscape() { // \\''"" Assertions.assertEquals("\\\\''\"", StatisticsUtil.escapeSQL(origin)); } + + @Test + public void testTableNotAnalyzedForTooLong() throws InterruptedException { + TableStatsMeta tableMeta = new TableStatsMeta(); + OlapTable olapTable = new OlapTable(); + ExternalTable externalTable = new ExternalTable(); + + // Test table or stats is null + Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(null, tableMeta)); + Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, null)); + + // Test user injected + tableMeta.userInjected = true; + Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, tableMeta)); + + // Test External table + tableMeta.userInjected = false; + Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(externalTable, tableMeta)); + + // Test config is 0 + Config.auto_analyze_interval_seconds = 0; + Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, tableMeta)); + + // Test time not long enough + Config.auto_analyze_interval_seconds = 86400; + tableMeta.lastAnalyzeTime = System.currentTimeMillis(); + Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, tableMeta)); + + // Test time long enough and update rows > 0 + Config.auto_analyze_interval_seconds = 1; + tableMeta.lastAnalyzeTime = System.currentTimeMillis(); + Thread.sleep(2000); + tableMeta.updatedRows.set(10); + Assertions.assertTrue(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, tableMeta)); + + // Test row count is not equal with last analyze + tableMeta.updatedRows.set(0); + tableMeta.rowCount = 10; + new MockUp
() { + @Mock + public long getRowCount() { + return 100; + } + }; + Assertions.assertTrue(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, tableMeta)); + + // Test visible version changed + new MockUp() { + @Mock + public long getVisibleVersion() { + return 100; + } + }; + new MockUp
() { + @Mock + public long getRowCount() { + return 10; + } + }; + ConcurrentMap, ColStatsMeta> colToColStatsMeta = new ConcurrentHashMap<>(); + ColStatsMeta col1Meta = new ColStatsMeta(0, AnalysisMethod.SAMPLE, AnalysisType.FUNDAMENTALS, JobType.SYSTEM, 0, 100); + ColStatsMeta col2Meta = new ColStatsMeta(0, AnalysisMethod.SAMPLE, AnalysisType.FUNDAMENTALS, JobType.SYSTEM, 0, 101); + colToColStatsMeta.put(Pair.of("index1", "col1"), col1Meta); + colToColStatsMeta.put(Pair.of("index2", "col2"), col2Meta); + tableMeta.setColToColStatsMeta(colToColStatsMeta); + Assertions.assertTrue(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, tableMeta)); + + // Test visible version unchanged. + col2Meta = new ColStatsMeta(0, AnalysisMethod.SAMPLE, AnalysisType.FUNDAMENTALS, JobType.SYSTEM, 0, 100); + colToColStatsMeta.put(Pair.of("index2", "col2"), col2Meta); + tableMeta.setColToColStatsMeta(colToColStatsMeta); + Assertions.assertFalse(StatisticsUtil.tableNotAnalyzedForTooLong(olapTable, tableMeta)); + } } diff --git a/regression-test/suites/statistics/analyze_stats.groovy b/regression-test/suites/statistics/analyze_stats.groovy index b4edc5e9d7bdc9e..280b08f1e74efbe 100644 --- a/regression-test/suites/statistics/analyze_stats.groovy +++ b/regression-test/suites/statistics/analyze_stats.groovy @@ -2941,7 +2941,38 @@ PARTITION `p599` VALUES IN (599) new_part_result = sql """show column stats part(colint)""" assertEquals("2.0", new_part_result[0][2]) - sql """DROP DATABASE IF EXISTS trigger""" + + // Test show last analyze table version + sql """create database if not exists test_version""" + sql """use test_version""" + sql """drop table if exists region""" + sql """ + CREATE TABLE region ( + r_regionkey int NOT NULL, + r_name VARCHAR(25) NOT NULL, + r_comment VARCHAR(152) + )ENGINE=OLAP + DUPLICATE KEY(`r_regionkey`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`r_regionkey`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + sql """analyze table region with sync""" + def versionResult = sql """show column stats region""" + assertEquals(versionResult[0][14], "1") + assertEquals(versionResult[1][14], "1") + assertEquals(versionResult[2][14], "1") + + sql """insert into region values (1, "1", "1")""" + sql """analyze table region with sync""" + versionResult = sql """show column stats region""" + assertEquals(versionResult[0][14], "2") + assertEquals(versionResult[1][14], "2") + assertEquals(versionResult[2][14], "2") + + sql """drop database if exists test_version""" }