Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](nereids) use binary search to prune partitions #44586

Merged
merged 11 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1429,10 +1429,34 @@ public class Config extends ConfigBase {
@ConfField(
mutable = true,
masterOnly = false,
callbackClassString = "org.apache.doris.common.NereidsSqlCacheManager$UpdateConfig"
callbackClassString = "org.apache.doris.common.cache.NereidsSqlCacheManager$UpdateConfig",
description = {
"当前默认设置为 300,用来控制控制NereidsSqlCacheManager中sql cache过期时间,超过一段时间不访问cache会被回收",
"The current default setting is 300, which is used to control the expiration time of SQL cache"
+ "in NereidsSqlCacheManager. If the cache is not accessed for a period of time, "
+ "it will be reclaimed"
}
)
public static int expire_sql_cache_in_fe_second = 300;


/**
* Expire sql sql in frontend time
*/
@ConfField(
mutable = true,
masterOnly = false,
callbackClassString = "org.apache.doris.common.cache.NereidsSortedPartitionsCacheManager$UpdateConfig",
description = {
"当前默认设置为 300,用来控制控制NereidsSortedPartitionsCacheManager中分区元数据缓存过期时间,"
+ "超过一段时间不访问cache会被回收",
"The current default setting is 300, which is used to control the expiration time of "
+ "the partition metadata cache in NereidsSortedPartitionsCheManager. "
+ "If the cache is not accessed for a period of time, it will be reclaimed"
}
)
public static int expire_cache_partition_meta_table_in_fe_second = 300;

/**
* Set the maximum number of rows that can be cached
*/
Expand Down Expand Up @@ -2275,8 +2299,7 @@ public class Config extends ConfigBase {
*/
@ConfField(
mutable = true,
varType = VariableAnnotation.EXPERIMENTAL,
callbackClassString = "org.apache.doris.common.NereidsSqlCacheManager$UpdateConfig",
callbackClassString = "org.apache.doris.common.cache.NereidsSqlCacheManager$UpdateConfig",
description = {
"当前默认设置为 100,用来控制控制NereidsSqlCacheManager管理的sql cache数量。",
"Now default set to 100, this config is used to control the number of "
Expand All @@ -2285,6 +2308,19 @@ public class Config extends ConfigBase {
)
public static int sql_cache_manage_num = 100;

@ConfField(
mutable = true,
callbackClassString = "org.apache.doris.common.cache.NereidsSortedPartitionsCacheManager$UpdateConfig",
description = {
"当前默认设置为 100,用来控制控制NereidsSortedPartitionsCacheManager中有序分区元数据的缓存个数,"
+ "用于加速分区裁剪",
"The current default setting is 100, which is used to control the number of ordered "
+ "partition metadata caches in NereidsSortedPartitionsCacheManager, "
+ "and to accelerate partition pruning"
}
)
public static int cache_partition_meta_table_manage_num = 100;

/**
* Maximum number of events to poll in each RPC.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public interface ConfHandler {
void handle(Field field, String confVal) throws Exception;
}

static class DefaultConfHandler implements ConfHandler {
public static class DefaultConfHandler implements ConfHandler {
@Override
public void handle(Field field, String confVal) throws Exception {
setConfigField(field, confVal);
Expand Down
10 changes: 9 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.LogUtils;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.NereidsSqlCacheManager;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.cache.NereidsSortedPartitionsCacheManager;
import org.apache.doris.common.cache.NereidsSqlCacheManager;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.lock.MonitoredReentrantLock;
Expand Down Expand Up @@ -567,6 +568,8 @@ public class Env {

private final NereidsSqlCacheManager sqlCacheManager;

private final NereidsSortedPartitionsCacheManager sortedPartitionsCacheManager;

private final SplitSourceManager splitSourceManager;

private final GlobalExternalTransactionInfoMgr globalExternalTransactionInfoMgr;
Expand Down Expand Up @@ -819,6 +822,7 @@ public Env(boolean isCheckpointCatalog) {
this.insertOverwriteManager = new InsertOverwriteManager();
this.dnsCache = new DNSCache();
this.sqlCacheManager = new NereidsSqlCacheManager();
this.sortedPartitionsCacheManager = new NereidsSortedPartitionsCacheManager();
this.splitSourceManager = new SplitSourceManager();
this.globalExternalTransactionInfoMgr = new GlobalExternalTransactionInfoMgr();
this.tokenManager = new TokenManager();
Expand Down Expand Up @@ -6659,6 +6663,10 @@ public NereidsSqlCacheManager getSqlCacheManager() {
return sqlCacheManager;
}

public NereidsSortedPartitionsCacheManager getSortedPartitionsCacheManager() {
return sortedPartitionsCacheManager;
}

public SplitSourceManager getSplitSourceManager() {
return splitSourceManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1382,12 +1382,12 @@ public Collection<Partition> getPartitions() {
}

// get only temp partitions
public Collection<Partition> getAllTempPartitions() {
public List<Partition> getAllTempPartitions() {
return tempPartitions.getAllPartitions();
}

// get all partitions including temp partitions
public Collection<Partition> getAllPartitions() {
public List<Partition> getAllPartitions() {
List<Partition> partitions = Lists.newArrayList(idToPartition.values());
partitions.addAll(tempPartitions.getAllPartitions());
return partitions;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.cache;

import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase.DefaultConfHandler;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.nereids.rules.expression.rules.MultiColumnBound;
import org.apache.doris.nereids.rules.expression.rules.PartitionItemToRange;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndId;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndRange;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Range;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.hadoop.util.Lists;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;

/** NereidsSortedPartitionsCacheManager */
924060929 marked this conversation as resolved.
Show resolved Hide resolved
public class NereidsSortedPartitionsCacheManager {
private volatile Cache<TableIdentifier, PartitionCacheContext> partitionCaches;

public NereidsSortedPartitionsCacheManager() {
partitionCaches = buildCaches(
Config.cache_partition_meta_table_manage_num,
Config.expire_cache_partition_meta_table_in_fe_second
);
}

public Optional<SortedPartitionRanges<?>> get(OlapTable table) {
DatabaseIf<?> database = table.getDatabase();
if (database == null) {
return Optional.empty();
}
CatalogIf<?> catalog = database.getCatalog();
if (catalog == null) {
return Optional.empty();
}
TableIdentifier key = new TableIdentifier(
catalog.getName(), database.getFullName(), table.getName());
PartitionCacheContext partitionCacheContext = partitionCaches.getIfPresent(key);
if (partitionCacheContext == null) {
return Optional.of(loadCache(key, table));
}
if (table.getId() != partitionCacheContext.tableId
|| table.getVisibleVersion() != partitionCacheContext.tableVersion) {
partitionCaches.invalidate(key);
return Optional.empty();
924060929 marked this conversation as resolved.
Show resolved Hide resolved
}
return Optional.of(partitionCacheContext.sortedPartitionRanges);
}

private SortedPartitionRanges<?> loadCache(TableIdentifier key, OlapTable olapTable) {
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
Map<Long, PartitionItem> allPartitions = partitionInfo.getIdToItem(false);
List<Entry<Long, PartitionItem>> sortedList = Lists.newArrayList(allPartitions.entrySet());
List<PartitionItemAndRange<?>> sortedRanges = Lists.newArrayListWithCapacity(allPartitions.size());
List<PartitionItemAndId<?>> defaultPartitions = Lists.newArrayList();
for (Entry<Long, PartitionItem> entry : sortedList) {
PartitionItem partitionItem = entry.getValue();
Long id = entry.getKey();
if (!partitionItem.isDefaultPartition()) {
List<Range<MultiColumnBound>> ranges = PartitionItemToRange.toRanges(partitionItem);
for (Range<MultiColumnBound> range : ranges) {
sortedRanges.add(new PartitionItemAndRange<>(id, partitionItem, range));
}
} else {
defaultPartitions.add(new PartitionItemAndId<>(id, partitionItem));
}
}

sortedRanges.sort((o1, o2) -> {
Range<MultiColumnBound> span1 = o1.range;
Range<MultiColumnBound> span2 = o2.range;
int result = span1.lowerEndpoint().compareTo(span2.lowerEndpoint());
if (result != 0) {
return result;
}
result = span1.upperEndpoint().compareTo(span2.upperEndpoint());
return result;
});
SortedPartitionRanges<?> sortedPartitionRanges = new SortedPartitionRanges(
sortedRanges, defaultPartitions
);
PartitionCacheContext context = new PartitionCacheContext(
olapTable.getId(), olapTable.getVisibleVersion(), sortedPartitionRanges);
partitionCaches.put(key, context);
return sortedPartitionRanges;
}

private static Cache<TableIdentifier, PartitionCacheContext> buildCaches(
int sortedPartitionTableManageNum, int expireSortedPartitionTableInFeSecond) {
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder()
// auto evict cache when jvm memory too low
.softValues();
if (sortedPartitionTableManageNum > 0) {
cacheBuilder = cacheBuilder.maximumSize(sortedPartitionTableManageNum);
}
if (expireSortedPartitionTableInFeSecond > 0) {
cacheBuilder = cacheBuilder.expireAfterAccess(Duration.ofSeconds(expireSortedPartitionTableInFeSecond));
}

return cacheBuilder.build();
}

public static synchronized void updateConfig() {
Env currentEnv = Env.getCurrentEnv();
if (currentEnv == null) {
return;
}
NereidsSortedPartitionsCacheManager cacheManager = currentEnv.getSortedPartitionsCacheManager();
if (cacheManager == null) {
return;
}

Cache<TableIdentifier, PartitionCacheContext> caches = buildCaches(
Config.cache_partition_meta_table_manage_num,
Config.expire_cache_partition_meta_table_in_fe_second
);
caches.putAll(cacheManager.partitionCaches.asMap());
cacheManager.partitionCaches = caches;
}

@Data
@AllArgsConstructor
private static class TableIdentifier {
public final String catalog;
public final String db;
public final String table;
}

private static class PartitionCacheContext {
private final long tableId;
private final long tableVersion;
private final SortedPartitionRanges sortedPartitionRanges;

public PartitionCacheContext(
long tableId, long tableVersion, SortedPartitionRanges sortedPartitionRanges) {
this.tableId = tableId;
this.tableVersion = tableVersion;
this.sortedPartitionRanges = sortedPartitionRanges;
}

@Override
public String toString() {
return "PartitionCacheContext(tableId="
+ tableId + ", tableVersion=" + tableVersion
+ ", partitionNum=" + sortedPartitionRanges.sortedPartitions.size() + ")";
}
}

// NOTE: used in Config.cache_partition_meta_table_manage_num.callbackClassString and
// Config.expire_cache_partition_meta_table_in_fe_second.callbackClassString,
// don't remove it!
public static class UpdateConfig extends DefaultConfHandler {
@Override
public void handle(Field field, String confVal) throws Exception {
super.handle(field, confVal);
NereidsSortedPartitionsCacheManager.updateConfig();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common;
package org.apache.doris.common.cache;

import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.DatabaseIf;
Expand All @@ -24,7 +24,10 @@
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase.DefaultConfHandler;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.metric.MetricRepo;
Expand Down
Loading
Loading