diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 935300dee6f2fd..6c6404eac1a591 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1429,10 +1429,34 @@ public class Config extends ConfigBase { @ConfField( mutable = true, masterOnly = false, - callbackClassString = "org.apache.doris.common.NereidsSqlCacheManager$UpdateConfig" + callbackClassString = "org.apache.doris.common.cache.NereidsSqlCacheManager$UpdateConfig", + description = { + "当前默认设置为 300,用来控制控制NereidsSqlCacheManager中sql cache过期时间,超过一段时间不访问cache会被回收", + "The current default setting is 300, which is used to control the expiration time of SQL cache" + + "in NereidsSqlCacheManager. If the cache is not accessed for a period of time, " + + "it will be reclaimed" + } ) public static int expire_sql_cache_in_fe_second = 300; + + /** + * Expire sql sql in frontend time + */ + @ConfField( + mutable = true, + masterOnly = false, + callbackClassString = "org.apache.doris.common.cache.NereidsSortedPartitionsCacheManager$UpdateConfig", + description = { + "当前默认设置为 300,用来控制控制NereidsSortedPartitionsCacheManager中分区元数据缓存过期时间," + + "超过一段时间不访问cache会被回收", + "The current default setting is 300, which is used to control the expiration time of " + + "the partition metadata cache in NereidsSortedPartitionsCheManager. " + + "If the cache is not accessed for a period of time, it will be reclaimed" + } + ) + public static int expire_cache_partition_meta_table_in_fe_second = 300; + /** * Set the maximum number of rows that can be cached */ @@ -2275,8 +2299,7 @@ public class Config extends ConfigBase { */ @ConfField( mutable = true, - varType = VariableAnnotation.EXPERIMENTAL, - callbackClassString = "org.apache.doris.common.NereidsSqlCacheManager$UpdateConfig", + callbackClassString = "org.apache.doris.common.cache.NereidsSqlCacheManager$UpdateConfig", description = { "当前默认设置为 100,用来控制控制NereidsSqlCacheManager管理的sql cache数量。", "Now default set to 100, this config is used to control the number of " @@ -2285,6 +2308,19 @@ public class Config extends ConfigBase { ) public static int sql_cache_manage_num = 100; + @ConfField( + mutable = true, + callbackClassString = "org.apache.doris.common.cache.NereidsSortedPartitionsCacheManager$UpdateConfig", + description = { + "当前默认设置为 100,用来控制控制NereidsSortedPartitionsCacheManager中有序分区元数据的缓存个数," + + "用于加速分区裁剪", + "The current default setting is 100, which is used to control the number of ordered " + + "partition metadata caches in NereidsSortedPartitionsCacheManager, " + + "and to accelerate partition pruning" + } + ) + public static int cache_partition_meta_table_manage_num = 100; + /** * Maximum number of events to poll in each RPC. */ diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/ConfigBase.java b/fe/fe-common/src/main/java/org/apache/doris/common/ConfigBase.java index 18ae1dc1c0171f..7181921792572e 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/ConfigBase.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/ConfigBase.java @@ -72,7 +72,7 @@ public interface ConfHandler { void handle(Field field, String confVal) throws Exception; } - static class DefaultConfHandler implements ConfHandler { + public static class DefaultConfHandler implements ConfHandler { @Override public void handle(Field field, String confVal) throws Exception { setConfigField(field, confVal); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 1c6345613d768d..e2a7003f472c16 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -107,10 +107,11 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.LogUtils; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.NereidsSqlCacheManager; import org.apache.doris.common.Pair; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; +import org.apache.doris.common.cache.NereidsSortedPartitionsCacheManager; +import org.apache.doris.common.cache.NereidsSqlCacheManager; import org.apache.doris.common.io.CountingDataOutputStream; import org.apache.doris.common.io.Text; import org.apache.doris.common.lock.MonitoredReentrantLock; @@ -567,6 +568,8 @@ public class Env { private final NereidsSqlCacheManager sqlCacheManager; + private final NereidsSortedPartitionsCacheManager sortedPartitionsCacheManager; + private final SplitSourceManager splitSourceManager; private final GlobalExternalTransactionInfoMgr globalExternalTransactionInfoMgr; @@ -819,6 +822,7 @@ public Env(boolean isCheckpointCatalog) { this.insertOverwriteManager = new InsertOverwriteManager(); this.dnsCache = new DNSCache(); this.sqlCacheManager = new NereidsSqlCacheManager(); + this.sortedPartitionsCacheManager = new NereidsSortedPartitionsCacheManager(); this.splitSourceManager = new SplitSourceManager(); this.globalExternalTransactionInfoMgr = new GlobalExternalTransactionInfoMgr(); this.tokenManager = new TokenManager(); @@ -6659,6 +6663,10 @@ public NereidsSqlCacheManager getSqlCacheManager() { return sqlCacheManager; } + public NereidsSortedPartitionsCacheManager getSortedPartitionsCacheManager() { + return sortedPartitionsCacheManager; + } + public SplitSourceManager getSplitSourceManager() { return splitSourceManager; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 477f76301120d2..d0dbf2d8de0452 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1382,12 +1382,12 @@ public Collection getPartitions() { } // get only temp partitions - public Collection getAllTempPartitions() { + public List getAllTempPartitions() { return tempPartitions.getAllPartitions(); } // get all partitions including temp partitions - public Collection getAllPartitions() { + public List getAllPartitions() { List partitions = Lists.newArrayList(idToPartition.values()); partitions.addAll(tempPartitions.getAllPartitions()); return partitions; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSortedPartitionsCacheManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSortedPartitionsCacheManager.java new file mode 100644 index 00000000000000..499c3b46709578 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSortedPartitionsCacheManager.java @@ -0,0 +1,197 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.cache; + +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.common.Config; +import org.apache.doris.common.ConfigBase.DefaultConfHandler; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.nereids.rules.expression.rules.MultiColumnBound; +import org.apache.doris.nereids.rules.expression.rules.PartitionItemToRange; +import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges; +import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndId; +import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndRange; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.collect.Range; +import lombok.AllArgsConstructor; +import lombok.Data; +import org.apache.hadoop.util.Lists; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; + +/** + * This cache is used to sort the table partitions by range, so we can do binary search to skip + * filter the huge numbers of partitions, for example, the table partition column is `dt date` + * and one date for one partition, range from '2017-01-01' to '2025-01-01', for partition predicate + * `where dt = '2024-12-24'`, we can fast jump to '2024-12-24' within few partition range comparison, + * and the QPS can be improved + */ +public class NereidsSortedPartitionsCacheManager { + private volatile Cache partitionCaches; + + public NereidsSortedPartitionsCacheManager() { + partitionCaches = buildCaches( + Config.cache_partition_meta_table_manage_num, + Config.expire_cache_partition_meta_table_in_fe_second + ); + } + + public Optional> get(OlapTable table) { + DatabaseIf database = table.getDatabase(); + if (database == null) { + return Optional.empty(); + } + CatalogIf catalog = database.getCatalog(); + if (catalog == null) { + return Optional.empty(); + } + TableIdentifier key = new TableIdentifier( + catalog.getName(), database.getFullName(), table.getName()); + PartitionCacheContext partitionCacheContext = partitionCaches.getIfPresent(key); + if (partitionCacheContext == null) { + return Optional.of(loadCache(key, table)); + } + if (table.getId() != partitionCacheContext.tableId + || table.getVisibleVersion() != partitionCacheContext.tableVersion) { + partitionCaches.invalidate(key); + return Optional.of(loadCache(key, table)); + } + return Optional.of(partitionCacheContext.sortedPartitionRanges); + } + + private SortedPartitionRanges loadCache(TableIdentifier key, OlapTable olapTable) { + PartitionInfo partitionInfo = olapTable.getPartitionInfo(); + Map allPartitions = partitionInfo.getIdToItem(false); + List> sortedList = Lists.newArrayList(allPartitions.entrySet()); + List> sortedRanges = Lists.newArrayListWithCapacity(allPartitions.size()); + List> defaultPartitions = Lists.newArrayList(); + for (Entry entry : sortedList) { + PartitionItem partitionItem = entry.getValue(); + Long id = entry.getKey(); + if (!partitionItem.isDefaultPartition()) { + List> ranges = PartitionItemToRange.toRanges(partitionItem); + for (Range range : ranges) { + sortedRanges.add(new PartitionItemAndRange<>(id, partitionItem, range)); + } + } else { + defaultPartitions.add(new PartitionItemAndId<>(id, partitionItem)); + } + } + + sortedRanges.sort((o1, o2) -> { + Range span1 = o1.range; + Range span2 = o2.range; + int result = span1.lowerEndpoint().compareTo(span2.lowerEndpoint()); + if (result != 0) { + return result; + } + result = span1.upperEndpoint().compareTo(span2.upperEndpoint()); + return result; + }); + SortedPartitionRanges sortedPartitionRanges = new SortedPartitionRanges( + sortedRanges, defaultPartitions + ); + PartitionCacheContext context = new PartitionCacheContext( + olapTable.getId(), olapTable.getVisibleVersion(), sortedPartitionRanges); + partitionCaches.put(key, context); + return sortedPartitionRanges; + } + + private static Cache buildCaches( + int sortedPartitionTableManageNum, int expireSortedPartitionTableInFeSecond) { + Caffeine cacheBuilder = Caffeine.newBuilder() + // auto evict cache when jvm memory too low + .softValues(); + if (sortedPartitionTableManageNum > 0) { + cacheBuilder = cacheBuilder.maximumSize(sortedPartitionTableManageNum); + } + if (expireSortedPartitionTableInFeSecond > 0) { + cacheBuilder = cacheBuilder.expireAfterAccess(Duration.ofSeconds(expireSortedPartitionTableInFeSecond)); + } + + return cacheBuilder.build(); + } + + public static synchronized void updateConfig() { + Env currentEnv = Env.getCurrentEnv(); + if (currentEnv == null) { + return; + } + NereidsSortedPartitionsCacheManager cacheManager = currentEnv.getSortedPartitionsCacheManager(); + if (cacheManager == null) { + return; + } + + Cache caches = buildCaches( + Config.cache_partition_meta_table_manage_num, + Config.expire_cache_partition_meta_table_in_fe_second + ); + caches.putAll(cacheManager.partitionCaches.asMap()); + cacheManager.partitionCaches = caches; + } + + @Data + @AllArgsConstructor + private static class TableIdentifier { + public final String catalog; + public final String db; + public final String table; + } + + private static class PartitionCacheContext { + private final long tableId; + private final long tableVersion; + private final SortedPartitionRanges sortedPartitionRanges; + + public PartitionCacheContext( + long tableId, long tableVersion, SortedPartitionRanges sortedPartitionRanges) { + this.tableId = tableId; + this.tableVersion = tableVersion; + this.sortedPartitionRanges = sortedPartitionRanges; + } + + @Override + public String toString() { + return "PartitionCacheContext(tableId=" + + tableId + ", tableVersion=" + tableVersion + + ", partitionNum=" + sortedPartitionRanges.sortedPartitions.size() + ")"; + } + } + + // NOTE: used in Config.cache_partition_meta_table_manage_num.callbackClassString and + // Config.expire_cache_partition_meta_table_in_fe_second.callbackClassString, + // don't remove it! + public static class UpdateConfig extends DefaultConfHandler { + @Override + public void handle(Field field, String confVal) throws Exception { + super.handle(field, confVal); + NereidsSortedPartitionsCacheManager.updateConfig(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java similarity index 99% rename from fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java rename to fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java index 86a2b875a93d68..aba0decb76e3a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common; +package org.apache.doris.common.cache; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.DatabaseIf; @@ -24,7 +24,10 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.View; +import org.apache.doris.common.Config; import org.apache.doris.common.ConfigBase.DefaultConfHandler; +import org.apache.doris.common.Pair; +import org.apache.doris.common.Status; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.metric.MetricRepo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/MultiColumnBound.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/MultiColumnBound.java new file mode 100644 index 00000000000000..078cc03733ce66 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/MultiColumnBound.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.expression.rules; + +import org.apache.doris.nereids.util.Utils; + +import java.util.List; +import java.util.Objects; + +/** MultiColumnBound */ +public class MultiColumnBound implements Comparable { + private final List columnBounds; + + public MultiColumnBound(List columnBounds) { + this.columnBounds = Utils.fastToImmutableList( + Objects.requireNonNull(columnBounds, "column bounds can not be null") + ); + } + + @Override + public int compareTo(MultiColumnBound o) { + for (int i = 0; i < columnBounds.size(); i++) { + ColumnBound columnBound = columnBounds.get(i); + ColumnBound otherColumnBound = o.columnBounds.get(i); + int result = columnBound.compareTo(otherColumnBound); + if (result != 0) { + return result; + } + } + return 0; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < columnBounds.size(); i++) { + if (i > 0) { + sb.append(","); + } + sb.append(columnBounds.get(i)); + } + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionItemToRange.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionItemToRange.java new file mode 100644 index 00000000000000..435dd700713940 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionItemToRange.java @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.expression.rules; + +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.ListPartitionItem; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.RangePartitionItem; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; +import org.apache.doris.nereids.types.DataType; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Range; + +import java.util.List; + +/** PartitionItemToRange */ +public class PartitionItemToRange { + /** toRangeSets */ + public static List> toRanges(PartitionItem partitionItem) { + if (partitionItem instanceof RangePartitionItem) { + Range range = partitionItem.getItems(); + PartitionKey lowerKey = range.lowerEndpoint(); + ImmutableList.Builder lowerBounds + = ImmutableList.builderWithExpectedSize(lowerKey.getKeys().size()); + for (LiteralExpr key : lowerKey.getKeys()) { + Literal literal = toNereidsLiteral(key); + lowerBounds.add(ColumnBound.of(literal)); + } + + PartitionKey upperKey = range.upperEndpoint(); + ImmutableList.Builder upperBounds + = ImmutableList.builderWithExpectedSize(lowerKey.getKeys().size()); + for (LiteralExpr key : upperKey.getKeys()) { + Literal literal = Literal.fromLegacyLiteral(key, key.getType()); + upperBounds.add(ColumnBound.of(literal)); + } + + return ImmutableList.of(Range.closedOpen( + new MultiColumnBound(lowerBounds.build()), + new MultiColumnBound(upperBounds.build()) + )); + } else if (partitionItem instanceof ListPartitionItem) { + List partitionKeys = partitionItem.getItems(); + ImmutableList.Builder> ranges + = ImmutableList.builderWithExpectedSize(partitionKeys.size()); + for (PartitionKey partitionKey : partitionKeys) { + ImmutableList.Builder bounds + = ImmutableList.builderWithExpectedSize(partitionKeys.size()); + for (LiteralExpr key : partitionKey.getKeys()) { + Literal literal = toNereidsLiteral(key); + bounds.add(ColumnBound.of(literal)); + } + MultiColumnBound bound = new MultiColumnBound(bounds.build()); + ranges.add(Range.singleton(bound)); + } + return ranges.build(); + } else { + throw new UnsupportedOperationException(partitionItem.getClass().getName()); + } + } + + private static Literal toNereidsLiteral(LiteralExpr partitionKeyLiteral) { + if (!partitionKeyLiteral.isMinValue()) { + return Literal.fromLegacyLiteral(partitionKeyLiteral, partitionKeyLiteral.getType()); + } else { + return new NullLiteral(DataType.fromCatalogType(partitionKeyLiteral.getType())); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPredicateToRange.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPredicateToRange.java new file mode 100644 index 00000000000000..88ecdab607c531 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPredicateToRange.java @@ -0,0 +1,267 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.expression.rules; + +import org.apache.doris.nereids.trees.expressions.And; +import org.apache.doris.nereids.trees.expressions.EqualTo; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.GreaterThan; +import org.apache.doris.nereids.trees.expressions.GreaterThanEqual; +import org.apache.doris.nereids.trees.expressions.InPredicate; +import org.apache.doris.nereids.trees.expressions.IsNull; +import org.apache.doris.nereids.trees.expressions.LessThan; +import org.apache.doris.nereids.trees.expressions.LessThanEqual; +import org.apache.doris.nereids.trees.expressions.Not; +import org.apache.doris.nereids.trees.expressions.NullSafeEqual; +import org.apache.doris.nereids.trees.expressions.Or; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.expressions.literal.MaxLiteral; +import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; +import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionVisitor; +import org.apache.doris.nereids.types.DataType; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.collect.BoundType; +import com.google.common.collect.ImmutableRangeSet; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; +import com.google.common.collect.TreeRangeSet; +import org.apache.hadoop.util.Lists; + +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** PartitionPredicateToRange */ +public class PartitionPredicateToRange extends DefaultExpressionVisitor, Void> { + private List columns; + private Set slotIds; + + /** PartitionPredicateToRange */ + public PartitionPredicateToRange(List columns) { + this.columns = Utils.fastToImmutableList( + Objects.requireNonNull(columns, "columns can not be null") + ); + + ImmutableSet.Builder slotIds = ImmutableSet.builderWithExpectedSize(columns.size()); + for (Slot column : columns) { + slotIds.add(column.getExprId().asInt()); + } + this.slotIds = slotIds.build(); + } + + @Override + public RangeSet visitAnd(And and, Void context) { + boolean first = true; + RangeSet intersects = null; + for (Expression child : and.children()) { + RangeSet childRanges = child.accept(this, context); + + // if some conjunct not supported, just skip it safety because the big ranges contains + // all partitions the predicates need + if (childRanges == null) { + continue; + } else if (first) { + first = false; + intersects = childRanges; + continue; + } + + for (Range childRange : childRanges.asRanges()) { + intersects = intersects.subRangeSet(childRange); + if (intersects.isEmpty()) { + break; + } + } + if (intersects.isEmpty()) { + break; + } + } + return intersects; + } + + @Override + public RangeSet visitOr(Or or, Void context) { + RangeSet intersects = TreeRangeSet.create(); + for (Expression child : or.children()) { + RangeSet childRanges = child.accept(this, context); + + // if any predicate can not parse to range, we can not do binary search + if (childRanges == null) { + return null; + } + intersects.addAll(childRanges); + } + return intersects; + } + + @Override + public RangeSet visitNot(Not not, Void context) { + Expression child = not.child(); + if (child instanceof IsNull && ((IsNull) child).child() instanceof SlotReference) { + SlotReference slot = (SlotReference) ((IsNull) child).child(); + int slotId = slot.getExprId().asInt(); + if (slotIds.contains(slotId)) { + DataType dataType = child.getDataType(); + return toRangeSet(slot, + new NullLiteral(dataType), BoundType.OPEN, + new MaxLiteral(dataType), BoundType.CLOSED + ); + } + } + return null; + } + + @Override + public RangeSet visitIsNull(IsNull isNull, Void context) { + Expression child = isNull.child(); + if (child instanceof SlotReference && slotIds.contains(((SlotReference) child).getExprId().asInt())) { + NullLiteral nullLiteral = new NullLiteral(child.getDataType()); + return toRangeSet((SlotReference) child, nullLiteral, BoundType.CLOSED, nullLiteral, BoundType.CLOSED); + } + return null; + } + + @Override + public RangeSet visitEqualTo(EqualTo equalTo, Void context) { + Expression left = equalTo.left(); + Expression right = equalTo.right(); + if (left instanceof SlotReference && right instanceof Literal) { + if (slotIds.contains(((SlotReference) left).getExprId().asInt())) { + Literal literal = (Literal) right; + return toRangeSet((SlotReference) left, literal, BoundType.CLOSED, literal, BoundType.CLOSED); + } + } + return null; + } + + @Override + public RangeSet visitNullSafeEqual(NullSafeEqual nullSafeEqual, Void context) { + Expression left = nullSafeEqual.left(); + Expression right = nullSafeEqual.right(); + if (left instanceof SlotReference && right instanceof Literal) { + if (slotIds.contains(((SlotReference) left).getExprId().asInt())) { + Literal literal = (Literal) right; + return toRangeSet((SlotReference) left, literal, BoundType.CLOSED, literal, BoundType.CLOSED); + } + } + return null; + } + + @Override + public RangeSet visitInPredicate(InPredicate inPredicate, Void context) { + Expression compareExpr = inPredicate.getCompareExpr(); + if (compareExpr instanceof SlotReference) { + SlotReference slot = (SlotReference) compareExpr; + if (slotIds.contains((slot).getExprId().asInt())) { + RangeSet union = TreeRangeSet.create(); + for (Expression option : inPredicate.getOptions()) { + if (!(option instanceof Literal)) { + return null; + } + Literal literal = (Literal) option; + union.addAll( + toRangeSet(slot, literal, BoundType.CLOSED, literal, BoundType.CLOSED) + ); + } + return union; + } + } + return null; + } + + @Override + public RangeSet visitLessThan(LessThan lessThan, Void context) { + Expression left = lessThan.left(); + Expression right = lessThan.right(); + if (left instanceof SlotReference && right instanceof Literal) { + if (slotIds.contains(((SlotReference) left).getExprId().asInt())) { + NullLiteral nullLiteral = new NullLiteral(right.getDataType()); + Literal literal = (Literal) right; + return toRangeSet((SlotReference) left, nullLiteral, BoundType.OPEN, literal, BoundType.OPEN); + } + } + return null; + } + + @Override + public RangeSet visitLessThanEqual(LessThanEqual lessThanEqual, Void context) { + Expression left = lessThanEqual.left(); + Expression right = lessThanEqual.right(); + if (left instanceof SlotReference && right instanceof Literal) { + if (slotIds.contains(((SlotReference) left).getExprId().asInt())) { + NullLiteral nullLiteral = new NullLiteral(right.getDataType()); + Literal literal = (Literal) right; + return toRangeSet((SlotReference) left, nullLiteral, BoundType.OPEN, literal, BoundType.CLOSED); + } + } + return null; + } + + @Override + public RangeSet visitGreaterThan(GreaterThan greaterThan, Void context) { + Expression left = greaterThan.left(); + Expression right = greaterThan.right(); + if (left instanceof SlotReference && right instanceof Literal) { + if (slotIds.contains(((SlotReference) left).getExprId().asInt())) { + Literal literal = (Literal) right; + MaxLiteral maxLiteral = new MaxLiteral(right.getDataType()); + return toRangeSet((SlotReference) left, literal, BoundType.OPEN, maxLiteral, BoundType.CLOSED); + } + } + return null; + } + + @Override + public RangeSet visitGreaterThanEqual(GreaterThanEqual greaterThanEqual, Void context) { + Expression left = greaterThanEqual.left(); + Expression right = greaterThanEqual.right(); + if (left instanceof SlotReference && right instanceof Literal) { + if (slotIds.contains(((SlotReference) left).getExprId().asInt())) { + Literal literal = (Literal) right; + MaxLiteral maxLiteral = new MaxLiteral(right.getDataType()); + return toRangeSet((SlotReference) left, literal, BoundType.CLOSED, maxLiteral, BoundType.CLOSED); + } + } + return null; + } + + private RangeSet toRangeSet(SlotReference slotReference, + Literal columnLowerBound, BoundType lowerBoundType, + Literal columnUpperBound, BoundType upperBoundType) { + List lowerBounds = Lists.newArrayListWithCapacity(columns.size()); + List upperBounds = Lists.newArrayListWithCapacity(columns.size()); + for (Slot column : columns) { + if (column.getExprId().asInt() == slotReference.getExprId().asInt()) { + lowerBounds.add(ColumnBound.of(columnLowerBound)); + upperBounds.add(ColumnBound.of(columnUpperBound)); + } else { + lowerBounds.add(ColumnBound.of(new NullLiteral(slotReference.getDataType()))); + upperBounds.add(ColumnBound.of(new MaxLiteral(slotReference.getDataType()))); + } + } + MultiColumnBound lowerBound = new MultiColumnBound(lowerBounds); + MultiColumnBound upperBound = new MultiColumnBound(upperBounds); + + Range range = Range.range(lowerBound, lowerBoundType, upperBound, upperBoundType); + return ImmutableRangeSet.of(range); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java index ed783aa3d5a9b6..6fc3aabf25e21b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java @@ -22,6 +22,8 @@ import org.apache.doris.catalog.RangePartitionItem; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext; +import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndId; +import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndRange; import org.apache.doris.nereids.trees.expressions.Cast; import org.apache.doris.nereids.trees.expressions.ComparisonPredicate; import org.apache.doris.nereids.trees.expressions.Expression; @@ -39,17 +41,22 @@ import com.google.common.collect.ImmutableList.Builder; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; +import com.google.common.collect.Sets; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; +import java.util.Set; /** * PartitionPruner */ public class PartitionPruner extends DefaultExpressionRewriter { - private final List partitions; + private final List> partitions; private final Expression partitionPredicate; /** Different type of table may have different partition prune behavior. */ @@ -58,7 +65,7 @@ public enum PartitionTableType { EXTERNAL } - private PartitionPruner(List partitions, Expression partitionPredicate) { + private PartitionPruner(List> partitions, Expression partitionPredicate) { this.partitions = Objects.requireNonNull(partitions, "partitions cannot be null"); this.partitionPredicate = Objects.requireNonNull(partitionPredicate.accept(this, null), "partitionPredicate cannot be null"); @@ -102,22 +109,29 @@ public Expression visitComparisonPredicate(ComparisonPredicate cp, Void context) } /** prune */ - public List prune() { + public > List prune() { Builder scanPartitionIdents = ImmutableList.builder(); for (OnePartitionEvaluator partition : partitions) { - if (!canBePrunedOut(partition)) { + if (!canBePrunedOut(partitionPredicate, partition)) { scanPartitionIdents.add((K) partition.getPartitionIdent()); } } return scanPartitionIdents.build(); } + public static > List prune(List partitionSlots, Expression partitionPredicate, + Map idToPartitions, CascadesContext cascadesContext, + PartitionTableType partitionTableType) { + return prune(partitionSlots, partitionPredicate, idToPartitions, + cascadesContext, partitionTableType, Optional.empty()); + } + /** * prune partition with `idToPartitions` as parameter. */ - public static List prune(List partitionSlots, Expression partitionPredicate, + public static > List prune(List partitionSlots, Expression partitionPredicate, Map idToPartitions, CascadesContext cascadesContext, - PartitionTableType partitionTableType) { + PartitionTableType partitionTableType, Optional> sortedPartitionRanges) { partitionPredicate = PartitionPruneExpressionExtractor.extract( partitionPredicate, ImmutableSet.copyOf(partitionSlots), cascadesContext); partitionPredicate = PredicateRewriteForPartitionPrune.rewrite(partitionPredicate, cascadesContext); @@ -134,39 +148,124 @@ public static List prune(List partitionSlots, Expression partitionP return ImmutableList.of(); } - List evaluators = Lists.newArrayListWithCapacity(idToPartitions.size()); - for (Entry kv : idToPartitions.entrySet()) { - evaluators.add(toPartitionEvaluator( - kv.getKey(), kv.getValue(), partitionSlots, cascadesContext, expandThreshold)); + if (sortedPartitionRanges.isPresent()) { + RangeSet predicateRanges = partitionPredicate.accept( + new PartitionPredicateToRange(partitionSlots), null); + if (predicateRanges != null) { + return binarySearchFiltering( + sortedPartitionRanges.get(), partitionSlots, partitionPredicate, cascadesContext, + expandThreshold, predicateRanges + ); + } } - PartitionPruner partitionPruner = new PartitionPruner(evaluators, partitionPredicate); - //TODO: we keep default partition because it's too hard to prune it, we return false in canPrune(). - return partitionPruner.prune(); + + return sequentialFiltering( + idToPartitions, partitionSlots, partitionPredicate, cascadesContext, expandThreshold + ); } /** * convert partition item to partition evaluator */ - public static final OnePartitionEvaluator toPartitionEvaluator(K id, PartitionItem partitionItem, + public static OnePartitionEvaluator toPartitionEvaluator(K id, PartitionItem partitionItem, List partitionSlots, CascadesContext cascadesContext, int expandThreshold) { if (partitionItem instanceof ListPartitionItem) { - return new OneListPartitionEvaluator( + return new OneListPartitionEvaluator<>( id, partitionSlots, (ListPartitionItem) partitionItem, cascadesContext); } else if (partitionItem instanceof RangePartitionItem) { - return new OneRangePartitionEvaluator( + return new OneRangePartitionEvaluator<>( id, partitionSlots, (RangePartitionItem) partitionItem, cascadesContext, expandThreshold); } else { - return new UnknownPartitionEvaluator(id, partitionItem); + return new UnknownPartitionEvaluator<>(id, partitionItem); + } + } + + private static > List binarySearchFiltering( + SortedPartitionRanges sortedPartitionRanges, List partitionSlots, + Expression partitionPredicate, CascadesContext cascadesContext, int expandThreshold, + RangeSet predicateRanges) { + List> sortedPartitions = sortedPartitionRanges.sortedPartitions; + + Set selectedIdSets = Sets.newTreeSet(); + int leftIndex = 0; + for (Range predicateRange : predicateRanges.asRanges()) { + int rightIndex = sortedPartitions.size(); + if (leftIndex >= rightIndex) { + break; + } + + int midIndex; + MultiColumnBound predicateUpperBound = predicateRange.upperEndpoint(); + MultiColumnBound predicateLowerBound = predicateRange.lowerEndpoint(); + + while (leftIndex + 1 < rightIndex) { + midIndex = (leftIndex + rightIndex) / 2; + PartitionItemAndRange partition = sortedPartitions.get(midIndex); + Range partitionSpan = partition.range; + + if (predicateUpperBound.compareTo(partitionSpan.lowerEndpoint()) < 0) { + rightIndex = midIndex; + } else if (predicateLowerBound.compareTo(partitionSpan.upperEndpoint()) > 0) { + leftIndex = midIndex; + } else { + break; + } + } + + for (; leftIndex < sortedPartitions.size(); leftIndex++) { + PartitionItemAndRange partition = sortedPartitions.get(leftIndex); + + K partitionId = partition.id; + // list partition will expand to multiple PartitionItemAndRange, we should skip evaluate it again + if (selectedIdSets.contains(partitionId)) { + continue; + } + + Range partitionSpan = partition.range; + if (predicateUpperBound.compareTo(partitionSpan.lowerEndpoint()) < 0) { + break; + } + + OnePartitionEvaluator partitionEvaluator = toPartitionEvaluator( + partitionId, partition.partitionItem, partitionSlots, cascadesContext, expandThreshold); + if (!canBePrunedOut(partitionPredicate, partitionEvaluator)) { + selectedIdSets.add(partitionId); + } + } + } + + for (PartitionItemAndId defaultPartition : sortedPartitionRanges.defaultPartitions) { + K partitionId = defaultPartition.id; + OnePartitionEvaluator partitionEvaluator = toPartitionEvaluator( + partitionId, defaultPartition.partitionItem, partitionSlots, cascadesContext, expandThreshold); + if (!canBePrunedOut(partitionPredicate, partitionEvaluator)) { + selectedIdSets.add(partitionId); + } } + + return Utils.fastToImmutableList(selectedIdSets); + } + + private static > List sequentialFiltering( + Map idToPartitions, List partitionSlots, + Expression partitionPredicate, CascadesContext cascadesContext, int expandThreshold) { + List> evaluators = Lists.newArrayListWithCapacity(idToPartitions.size()); + for (Entry kv : idToPartitions.entrySet()) { + evaluators.add(toPartitionEvaluator( + kv.getKey(), kv.getValue(), partitionSlots, cascadesContext, expandThreshold)); + } + PartitionPruner partitionPruner = new PartitionPruner(evaluators, partitionPredicate); + //TODO: we keep default partition because it's too hard to prune it, we return false in canPrune(). + return partitionPruner.prune(); } /** * return true if partition is not qualified. that is, can be pruned out. */ - private boolean canBePrunedOut(OnePartitionEvaluator evaluator) { + private static boolean canBePrunedOut(Expression partitionPredicate, OnePartitionEvaluator evaluator) { List> onePartitionInputs = evaluator.getOnePartitionInputs(); for (Map currentInputs : onePartitionInputs) { - // evaluate wether there's possible for this partition to accept this predicate + // evaluate whether there's possible for this partition to accept this predicate Expression result = evaluator.evaluateWithDefaultPartition(partitionPredicate, currentInputs); if (!result.equals(BooleanLiteral.FALSE) && !(result instanceof NullLiteral)) { return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/SortedPartitionRanges.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/SortedPartitionRanges.java new file mode 100644 index 00000000000000..50d4cb3befa538 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/SortedPartitionRanges.java @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.expression.rules; + +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.collect.Range; + +import java.util.List; +import java.util.Objects; + +/** SortedPartitionRanges */ +public class SortedPartitionRanges { + public final List> sortedPartitions; + public final List> defaultPartitions; + + /** SortedPartitionRanges */ + public SortedPartitionRanges( + List> sortedPartitions, List> defaultPartitions) { + this.sortedPartitions = Utils.fastToImmutableList( + Objects.requireNonNull(sortedPartitions, "sortedPartitions bounds can not be null") + ); + this.defaultPartitions = Utils.fastToImmutableList( + Objects.requireNonNull(defaultPartitions, "defaultPartitions bounds can not be null") + ); + } + + /** PartitionItemAndRange */ + public static class PartitionItemAndRange { + public final K id; + public final PartitionItem partitionItem; + public final Range range; + + public PartitionItemAndRange(K id, PartitionItem partitionItem, Range range) { + this.id = id; + this.partitionItem = Objects.requireNonNull(partitionItem, "partitionItem can not be null"); + this.range = Objects.requireNonNull(range, "range can not be null"); + } + + @Override + public String toString() { + return range.toString(); + } + } + + /** PartitionItemAndId */ + public static class PartitionItemAndId { + public final K id; + public final PartitionItem partitionItem; + + public PartitionItemAndId(K id, PartitionItem partitionItem) { + this.id = id; + this.partitionItem = Objects.requireNonNull(partitionItem, "partitionItem can not be null"); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneOlapScanPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneOlapScanPartition.java index 0d5054086117d9..c5868b4b68ae30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneOlapScanPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneOlapScanPartition.java @@ -18,13 +18,16 @@ package org.apache.doris.nereids.rules.rewrite; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.PartitionInfo; import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.common.cache.NereidsSortedPartitionsCacheManager; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.rules.expression.rules.PartitionPruner; import org.apache.doris.nereids.rules.expression.rules.PartitionPruner.PartitionTableType; +import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; @@ -37,6 +40,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -76,9 +80,16 @@ public Rule build() { partitionSlots.add(partitionSlot); } } + NereidsSortedPartitionsCacheManager sortedPartitionsCacheManager = Env.getCurrentEnv() + .getSortedPartitionsCacheManager(); List manuallySpecifiedPartitions = scan.getManuallySpecifiedPartitions(); Map idToPartitions; + Optional> sortedPartitionRanges = Optional.empty(); if (manuallySpecifiedPartitions.isEmpty()) { + Optional> sortedPartitionRangesOpt = sortedPartitionsCacheManager.get(table); + if (sortedPartitionRangesOpt.isPresent()) { + sortedPartitionRanges = (Optional) sortedPartitionRangesOpt; + } idToPartitions = partitionInfo.getIdToItem(false); } else { Map allPartitions = partitionInfo.getAllPartitions(); @@ -88,7 +99,7 @@ public Rule build() { } List prunedPartitions = PartitionPruner.prune( partitionSlots, filter.getPredicate(), idToPartitions, ctx.cascadesContext, - PartitionTableType.OLAP); + PartitionTableType.OLAP, sortedPartitionRanges); if (prunedPartitions.isEmpty()) { return new LogicalEmptyRelation( ConnectContext.get().getStatementContext().getNextRelationId(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index e757f3153db038..055a4c31e9075e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -108,10 +108,10 @@ import org.apache.doris.common.FormatOptions; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.NereidsException; -import org.apache.doris.common.NereidsSqlCacheManager; import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.Version; +import org.apache.doris.common.cache.NereidsSqlCacheManager; import org.apache.doris.common.profile.Profile; import org.apache.doris.common.profile.ProfileManager.ProfileType; import org.apache.doris.common.profile.SummaryProfile;