Skip to content

Commit

Permalink
[Improvement](statistics) Improve sample count accuracy (apache#25175)
Browse files Browse the repository at this point in the history
While doing sample analyze, the result of row count, null number and datasize need to multiply a coefficient based on 
the sample percent/rows. This pr is mainly to calculate the coefficient according to the sampled file size over total size.
  • Loading branch information
Jibing-Li authored Oct 12, 2023
1 parent 22684de commit c63bf24
Show file tree
Hide file tree
Showing 19 changed files with 371 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.catalog.external.ExternalTable;
Expand Down Expand Up @@ -167,11 +166,6 @@ public void check() throws AnalysisException {
}
analyzeProperties.check();

// TODO support external table
if (analyzeProperties.isSampleRows() && !(table instanceof OlapTable)) {
throw new AnalysisException("Sampling statistics "
+ "collection of external tables is not supported with rows, use percent instead.");
}
if (analyzeProperties.isSync()
&& (analyzeProperties.isAutomatic() || analyzeProperties.getPeriodTimeInMs() != 0)) {
throw new AnalysisException("Automatic/Period statistics collection "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,11 +476,6 @@ protected void analyzeSample() throws AnalysisException {
throw new AnalysisException("Sample table " + desc.getTable().getName()
+ " type " + desc.getTable().getType() + " is not supported");
}
if (tableSample != null && TableIf.TableType.HMS_EXTERNAL_TABLE.equals(desc.getTable().getType())) {
if (!tableSample.isPercent()) {
throw new AnalysisException("HMS table doesn't support sample rows, use percent instead.");
}
}
}

/**
Expand Down
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -583,4 +583,9 @@ public boolean needReAnalyzeTable(TableStatsMeta tblStats) {
public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
return Collections.emptyMap();
}

@Override
public List<Long> getChunkSizes() {
throw new NotImplementedException("getChunkSized not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ default int getBaseColumnIdxByName(String colName) {

Map<String, Set<String>> findReAnalyzeNeededPartitions();

// Get all the chunk sizes of this table. Now, only HMS external table implemented this interface.
// For HMS external table, the return result is a list of all the files' size.
List<Long> getChunkSizes();

void write(DataOutput out) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,4 +399,9 @@ public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
partitions.add("Dummy Partition");
return getBaseSchema().stream().collect(Collectors.toMap(Column::getName, k -> partitions));
}

@Override
public List<Long> getChunkSizes() {
throw new NotImplementedException("getChunkSized not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
Expand Down Expand Up @@ -644,6 +645,36 @@ public void gsonPostProcess() throws IOException {
super.gsonPostProcess();
estimatedRowCount = -1;
}

@Override
public List<Long> getChunkSizes() {
HiveMetaStoreCache.HivePartitionValues partitionValues = StatisticsUtil.getPartitionValuesForTable(this);
List<HiveMetaStoreCache.FileCacheValue> filesByPartitions
= StatisticsUtil.getFilesForPartitions(this, partitionValues, 0);
List<Long> result = Lists.newArrayList();
for (HiveMetaStoreCache.FileCacheValue files : filesByPartitions) {
for (HiveMetaStoreCache.HiveFileStatus file : files.getFiles()) {
result.add(file.getLength());
}
}
return result;
}

@Override
public long getDataSize(boolean singleReplica) {
long totalSize = StatisticsUtil.getTotalSizeFromHMS(this);
// Usually, we can get total size from HMS parameter.
if (totalSize > 0) {
return totalSize;
}
// If not found the size in HMS, calculate it by sum all files' size in table.
List<Long> chunkSizes = getChunkSizes();
long total = 0;
for (long size : chunkSizes) {
total += size;
}
return total;
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,12 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla
break;
case HIVE:
scanNode = new HiveScanNode(fileScan.translatePlanNodeId(), tupleDescriptor, false);
((HiveScanNode) scanNode).setSelectedPartitions(fileScan.getSelectedPartitions());
HiveScanNode hiveScanNode = (HiveScanNode) scanNode;
hiveScanNode.setSelectedPartitions(fileScan.getSelectedPartitions());
if (fileScan.getTableSample().isPresent()) {
hiveScanNode.setTableSample(new TableSample(fileScan.getTableSample().get().isPercent,
fileScan.getTableSample().get().sampleValue, fileScan.getTableSample().get().seek));
}
break;
default:
throw new RuntimeException("do not support DLA type " + ((HMSExternalTable) table).getDlaType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,13 @@ private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelatio
Plan hiveViewPlan = parseAndAnalyzeHiveView(hiveCatalog, ddlSql, cascadesContext);
return new LogicalSubQueryAlias<>(tableQualifier, hiveViewPlan);
}
return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table, tableQualifier);
return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table, tableQualifier,
unboundRelation.getTableSample());
case ICEBERG_EXTERNAL_TABLE:
case PAIMON_EXTERNAL_TABLE:
case MAX_COMPUTE_EXTERNAL_TABLE:
return new LogicalFileScan(unboundRelation.getRelationId(), (ExternalTable) table, tableQualifier);
return new LogicalFileScan(unboundRelation.getRelationId(), (ExternalTable) table, tableQualifier,
unboundRelation.getTableSample());
case SCHEMA:
return new LogicalSchemaScan(unboundRelation.getRelationId(), table, tableQualifier);
case JDBC_EXTERNAL_TABLE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public Rule build() {
Optional.empty(),
fileScan.getLogicalProperties(),
fileScan.getConjuncts(),
fileScan.getSelectedPartitions())
fileScan.getSelectedPartitions(),
fileScan.getTableSample())
).toRule(RuleType.LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public Plan visitLogicalFileScan(LogicalFileScan fileScan, DeepCopierContext con
return context.getRelationReplaceMap().get(fileScan.getRelationId());
}
LogicalFileScan newFileScan = new LogicalFileScan(StatementScopeIdGenerator.newRelationId(),
fileScan.getTable(), fileScan.getQualifier());
fileScan.getTable(), fileScan.getQualifier(), fileScan.getTableSample());
updateLeadingRelationIdMap(newFileScan.getRelationId(), fileScan.getTable().getName(), newFileScan);
updateReplaceMapWithOutput(fileScan, newFileScan, context.exprIdReplaceMap);
context.putRelation(fileScan.getRelationId(), newFileScan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.TableSample;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
Expand Down Expand Up @@ -49,22 +50,26 @@ public class LogicalFileScan extends LogicalCatalogRelation {
private final Set<Expression> conjuncts;
@Getter
private final SelectedPartitions selectedPartitions;
@Getter
private final Optional<TableSample> tableSample;

/**
* Constructor for LogicalFileScan.
*/
public LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier,
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
Set<Expression> conjuncts, SelectedPartitions selectedPartitions) {
Set<Expression> conjuncts, SelectedPartitions selectedPartitions, Optional<TableSample> tableSample) {
super(id, PlanType.LOGICAL_FILE_SCAN, table, qualifier,
groupExpression, logicalProperties);
this.conjuncts = conjuncts;
this.selectedPartitions = selectedPartitions;
this.tableSample = tableSample;
}

public LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier) {
public LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier,
Optional<TableSample> tableSample) {
this(id, table, qualifier, Optional.empty(), Optional.empty(),
Sets.newHashSet(), SelectedPartitions.NOT_PRUNED);
Sets.newHashSet(), SelectedPartitions.NOT_PRUNED, tableSample);
}

@Override
Expand All @@ -85,24 +90,24 @@ public String toString() {
@Override
public LogicalFileScan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalFileScan(relationId, (ExternalTable) table, qualifier, groupExpression,
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions);
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions, tableSample);
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new LogicalFileScan(relationId, (ExternalTable) table, qualifier,
groupExpression, logicalProperties, conjuncts, selectedPartitions);
groupExpression, logicalProperties, conjuncts, selectedPartitions, tableSample);
}

public LogicalFileScan withConjuncts(Set<Expression> conjuncts) {
return new LogicalFileScan(relationId, (ExternalTable) table, qualifier, groupExpression,
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions);
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions, tableSample);
}

public LogicalFileScan withSelectedPartitions(SelectedPartitions selectedPartitions) {
return new LogicalFileScan(relationId, (ExternalTable) table, qualifier, groupExpression,
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions);
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions, tableSample);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.nereids.properties.DistributionSpec;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.TableSample;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
Expand All @@ -47,18 +48,21 @@ public class PhysicalFileScan extends PhysicalCatalogRelation {
private final Set<Expression> conjuncts;
@Getter
private final SelectedPartitions selectedPartitions;
@Getter
private final Optional<TableSample> tableSample;

/**
* Constructor for PhysicalFileScan.
*/
public PhysicalFileScan(RelationId id, ExternalTable table, List<String> qualifier,
DistributionSpec distributionSpec, Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties, Set<Expression> conjuncts,
SelectedPartitions selectedPartitions) {
SelectedPartitions selectedPartitions, Optional<TableSample> tableSample) {
super(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier, groupExpression, logicalProperties);
this.distributionSpec = distributionSpec;
this.conjuncts = conjuncts;
this.selectedPartitions = selectedPartitions;
this.tableSample = tableSample;
}

/**
Expand All @@ -67,12 +71,14 @@ public PhysicalFileScan(RelationId id, ExternalTable table, List<String> qualifi
public PhysicalFileScan(RelationId id, ExternalTable table, List<String> qualifier,
DistributionSpec distributionSpec, Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties, PhysicalProperties physicalProperties,
Statistics statistics, Set<Expression> conjuncts, SelectedPartitions selectedPartitions) {
Statistics statistics, Set<Expression> conjuncts, SelectedPartitions selectedPartitions,
Optional<TableSample> tableSample) {
super(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier, groupExpression, logicalProperties,
physicalProperties, statistics);
this.distributionSpec = distributionSpec;
this.conjuncts = conjuncts;
this.selectedPartitions = selectedPartitions;
this.tableSample = tableSample;
}

@Override
Expand All @@ -95,14 +101,14 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
@Override
public PhysicalFileScan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalFileScan(relationId, getTable(), qualifier, distributionSpec,
groupExpression, getLogicalProperties(), conjuncts, selectedPartitions);
groupExpression, getLogicalProperties(), conjuncts, selectedPartitions, tableSample);
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new PhysicalFileScan(relationId, getTable(), qualifier, distributionSpec,
groupExpression, logicalProperties.get(), conjuncts, selectedPartitions);
groupExpression, logicalProperties.get(), conjuncts, selectedPartitions, tableSample);
}

@Override
Expand All @@ -115,6 +121,6 @@ public PhysicalFileScan withPhysicalPropertiesAndStats(PhysicalProperties physic
Statistics statistics) {
return new PhysicalFileScan(relationId, getTable(), qualifier, distributionSpec,
groupExpression, getLogicalProperties(), physicalProperties, statistics, conjuncts,
selectedPartitions);
selectedPartitions, tableSample);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -95,6 +96,7 @@ public abstract class FileQueryScanNode extends FileScanNode {
protected Map<String, SlotDescriptor> destSlotDescByName;
protected TFileScanRangeParams params;

@Getter
protected TableSample tableSample;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;

public class HiveScanNode extends FileQueryScanNode {
Expand Down Expand Up @@ -263,9 +264,18 @@ private List<HiveMetaStoreCache.HiveFileStatus> selectFiles(List<FileCacheValue>
totalSize += file.getLength();
}
}
long sampleSize = totalSize * tableSample.getSampleValue() / 100;
long sampleSize = 0;
if (tableSample.isPercent()) {
sampleSize = totalSize * tableSample.getSampleValue() / 100;
} else {
long estimatedRowSize = 0;
for (Column column : hmsTable.getFullSchema()) {
estimatedRowSize += column.getDataType().getSlotSize();
}
sampleSize = estimatedRowSize * tableSample.getSampleValue();
}
long selectedSize = 0;
Collections.shuffle(fileList);
Collections.shuffle(fileList, new Random(tableSample.getSeek()));
int index = 0;
for (HiveMetaStoreCache.HiveFileStatus file : fileList) {
selectedSize += file.getLength();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ public class AnalysisInfoBuilder {
private boolean samplingPartition;
private boolean isAllPartition;
private long partitionCount;

private CronExpression cronExpression;

private boolean forceFull;

public AnalysisInfoBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ protected void init(AnalysisInfo info) {
info, AnalysisState.FAILED,
String.format("Table with name %s not exists", info.tblName), System.currentTimeMillis());
}
tableSample = getTableSample();
// External Table level task doesn't contain a column. Don't need to do the column related analyze.
if (info.externalTableLevelTask) {
return;
Expand All @@ -150,8 +151,6 @@ protected void init(AnalysisInfo info) {
Preconditions.checkArgument(!StatisticsUtil.isUnsupportedType(col.getType()),
String.format("Column with type %s is not supported", col.getType().toString()));
}
tableSample = getTableSample();

}

public void execute() {
Expand Down Expand Up @@ -230,19 +229,18 @@ protected TableSample getTableSample() {
if (info.forceFull) {
return null;
}
long sampleRows = info.sampleRows;
if (info.analysisMethod == AnalysisMethod.FULL) {
if (Config.enable_auto_sample
&& tbl.getDataSize(true) > Config.huge_table_lower_bound_size_in_bytes) {
sampleRows = Config.huge_table_default_sample_rows;
} else {
return null;
}
}
// If user specified sample percent or sample rows, use it.
if (info.samplePercent > 0) {
return new TableSample(true, (long) info.samplePercent);
} else if (info.sampleRows > 0) {
return new TableSample(false, info.sampleRows);
} else if (info.analysisMethod == AnalysisMethod.FULL
&& Config.enable_auto_sample
&& tbl.getDataSize(true) > Config.huge_table_lower_bound_size_in_bytes) {
// If user doesn't specify sample percent/rows, use auto sample and update sample rows in analysis info.
return new TableSample(false, (long) Config.huge_table_default_sample_rows);
} else {
return new TableSample(false, sampleRows);
return null;
}
}

Expand Down
Loading

0 comments on commit c63bf24

Please sign in to comment.