diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 993702c4dac57f8..c5f258247b10840 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2134,6 +2134,10 @@ public class Config extends ConfigBase {
"Max cache number of external table row count"})
public static long max_external_table_row_count_cache_num = 100000;
+ @ConfField(description = {"每个查询的外表文件元数据缓存的最大数量。",
+ "Max cache number of external table split file meta cache at query level."})
+ public static long max_external_table_split_file_meta_cache_num = 10000;
+
/**
* Max cache loader thread-pool size.
* Max thread pool size for loading external meta cache
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java b/fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java
index 50f46647975e235..4b2b8a2a6cd0b3b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java
@@ -19,11 +19,13 @@
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Ticker;
+import com.github.benmanes.caffeine.cache.Weigher;
import org.jetbrains.annotations.NotNull;
import java.time.Duration;
@@ -44,28 +46,57 @@
* The cache can be created with the above parameters using the buildCache and buildAsyncCache methods.
*
*/
-public class CacheFactory {
+public class CacheFactory {
private OptionalLong expireAfterWriteSec;
private OptionalLong refreshAfterWriteSec;
- private long maxSize;
+ private OptionalLong maxSize;
private boolean enableStats;
// Ticker is used to provide a time source for the cache.
// Only used for test, to provide a fake time source.
// If not provided, the system time is used.
private Ticker ticker;
+ private OptionalLong maxWeight;
+
+ private Weigher weigher;
+
public CacheFactory(
OptionalLong expireAfterWriteSec,
OptionalLong refreshAfterWriteSec,
long maxSize,
boolean enableStats,
Ticker ticker) {
+ this(expireAfterWriteSec, refreshAfterWriteSec, OptionalLong.of(maxSize), enableStats, ticker,
+ OptionalLong.empty(), null);
+ }
+
+ public CacheFactory(
+ OptionalLong expireAfterWriteSec,
+ OptionalLong refreshAfterWriteSec,
+ boolean enableStats,
+ Ticker ticker,
+ long maxWeight,
+ Weigher weigher) {
+ this(expireAfterWriteSec, refreshAfterWriteSec, OptionalLong.empty(), enableStats, ticker,
+ OptionalLong.of(maxWeight), weigher);
+ }
+
+ private CacheFactory(
+ OptionalLong expireAfterWriteSec,
+ OptionalLong refreshAfterWriteSec,
+ OptionalLong maxSize,
+ boolean enableStats,
+ Ticker ticker,
+ OptionalLong maxWeight,
+ Weigher weigher) {
this.expireAfterWriteSec = expireAfterWriteSec;
this.refreshAfterWriteSec = refreshAfterWriteSec;
this.maxSize = maxSize;
this.enableStats = enableStats;
this.ticker = ticker;
+ this.maxWeight = maxWeight;
+ this.weigher = weigher;
}
// Build a loading cache, without executor, it will use fork-join pool for refresh
@@ -85,6 +116,11 @@ public LoadingCache buildCache(CacheLoader cacheLoader,
return builder.build(cacheLoader);
}
+ public Cache buildCache() {
+ Caffeine builder = buildWithParams();
+ return builder.build();
+ }
+
// Build an async loading cache
public AsyncLoadingCache buildAsyncCache(AsyncCacheLoader cacheLoader,
ExecutorService executor) {
@@ -94,9 +130,11 @@ public AsyncLoadingCache buildAsyncCache(AsyncCacheLoader cac
}
@NotNull
- private Caffeine buildWithParams() {
+ private Caffeine buildWithParams() {
Caffeine builder = Caffeine.newBuilder();
- builder.maximumSize(maxSize);
+ if (maxSize.isPresent()) {
+ builder.maximumSize(maxSize.getAsLong());
+ }
if (expireAfterWriteSec.isPresent()) {
builder.expireAfterWrite(Duration.ofSeconds(expireAfterWriteSec.getAsLong()));
@@ -112,6 +150,14 @@ private Caffeine buildWithParams() {
if (ticker != null) {
builder.ticker(ticker);
}
+
+ if (maxWeight.isPresent()) {
+ builder.maximumWeight(maxWeight.getAsLong());
+ }
+
+ if (weigher != null) {
+ builder.weigher(weigher);
+ }
return builder;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java b/fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java
new file mode 100644
index 000000000000000..5942eb2b1f3a9aa
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EmptyCache.java
+// and modified by Doris
+
+package org.apache.doris.common;
+
+import com.google.common.cache.AbstractLoadingCache;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.CacheLoader.InvalidCacheLoadException;
+import com.google.common.cache.CacheStats;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+
+class EmptyCache
+ extends AbstractLoadingCache {
+ private final CacheLoader super K, V> loader;
+ private final StatsCounter statsCounter;
+
+ EmptyCache(CacheLoader super K, V> loader, boolean recordStats) {
+ this.loader = Objects.requireNonNull(loader, "loader is null");
+ this.statsCounter = recordStats ? new SimpleStatsCounter() : new NoopStatsCounter();
+ }
+
+ @Override
+ public V getIfPresent(Object key) {
+ statsCounter.recordMisses(1);
+ return null;
+ }
+
+ @Override
+ public V get(K key)
+ throws ExecutionException {
+ return get(key, () -> loader.load(key));
+ }
+
+ @Override
+ public ImmutableMap getAll(Iterable extends K> keys)
+ throws ExecutionException {
+ try {
+ Set keySet = ImmutableSet.copyOf(keys);
+ statsCounter.recordMisses(keySet.size());
+ @SuppressWarnings("unchecked") // safe since all keys extend K
+ ImmutableMap result = (ImmutableMap) loader.loadAll(keySet);
+ for (K key : keySet) {
+ if (!result.containsKey(key)) {
+ throw new InvalidCacheLoadException("loadAll failed to return a value for " + key);
+ }
+ }
+ statsCounter.recordLoadSuccess(1);
+ return result;
+ } catch (RuntimeException e) {
+ statsCounter.recordLoadException(1);
+ throw new UncheckedExecutionException(e);
+ } catch (Exception e) {
+ statsCounter.recordLoadException(1);
+ throw new ExecutionException(e);
+ }
+ }
+
+ @Override
+ public V get(K key, Callable extends V> valueLoader)
+ throws ExecutionException {
+ statsCounter.recordMisses(1);
+ try {
+ V value = valueLoader.call();
+ statsCounter.recordLoadSuccess(1);
+ return value;
+ } catch (RuntimeException e) {
+ statsCounter.recordLoadException(1);
+ throw new UncheckedExecutionException(e);
+ } catch (Exception e) {
+ statsCounter.recordLoadException(1);
+ throw new ExecutionException(e);
+ }
+ }
+
+ @Override
+ public void put(K key, V value) {
+ // Cache, even if configured to evict everything immediately, should allow writes.
+ }
+
+ @Override
+ public void refresh(K key) {}
+
+ @Override
+ public void invalidate(Object key) {}
+
+ @Override
+ public void invalidateAll(Iterable> keys) {}
+
+ @Override
+ public void invalidateAll() {
+
+ }
+
+ @Override
+ public long size() {
+ return 0;
+ }
+
+ @Override
+ public CacheStats stats() {
+ return statsCounter.snapshot();
+ }
+
+ @Override
+ public ConcurrentMap asMap() {
+ return new ConcurrentMap() {
+ @Override
+ public V putIfAbsent(K key, V value) {
+ // Cache, even if configured to evict everything immediately, should allow writes.
+ // putIfAbsent returns the previous value
+ return null;
+ }
+
+ @Override
+ public boolean remove(Object key, Object value) {
+ return false;
+ }
+
+ @Override
+ public boolean replace(K key, V oldValue, V newValue) {
+ return false;
+ }
+
+ @Override
+ public V replace(K key, V value) {
+ return null;
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return true;
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return false;
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return false;
+ }
+
+ @Override
+ @Nullable
+ public V get(Object key) {
+ return null;
+ }
+
+ @Override
+ @Nullable
+ public V put(K key, V value) {
+ // Cache, even if configured to evict everything immediately, should allow writes.
+ return null;
+ }
+
+ @Override
+ @Nullable
+ public V remove(Object key) {
+ return null;
+ }
+
+ @Override
+ public void putAll(Map extends K, ? extends V> m) {
+ // Cache, even if configured to evict everything immediately, should allow writes.
+ }
+
+ @Override
+ public void clear() {
+
+ }
+
+ @Override
+ public Set keySet() {
+ return ImmutableSet.of();
+ }
+
+ @Override
+ public Collection values() {
+ return ImmutableSet.of();
+ }
+
+ @Override
+ public Set> entrySet() {
+ return ImmutableSet.of();
+ }
+ };
+ }
+
+ private static class NoopStatsCounter
+ implements StatsCounter {
+ private static final CacheStats EMPTY_STATS = new SimpleStatsCounter().snapshot();
+
+ @Override
+ public void recordHits(int count) {}
+
+ @Override
+ public void recordMisses(int count) {}
+
+ @Override
+ public void recordLoadSuccess(long loadTime) {}
+
+ @Override
+ public void recordLoadException(long loadTime) {}
+
+ @Override
+ public void recordEviction() {}
+
+ @Override
+ public CacheStats snapshot() {
+ return EMPTY_STATS;
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCache.java b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCache.java
new file mode 100644
index 000000000000000..a2cb05d82c5556c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCache.java
@@ -0,0 +1,466 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EvictableCache.java
+// and modified by Doris
+
+package org.apache.doris.common;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Verify;
+import com.google.common.cache.AbstractLoadingCache;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.CacheStats;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalCause;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import javax.annotation.Nullable;
+
+/**
+ * A {@link Cache} and {@link LoadingCache} implementation similar to ones
+ * produced by {@link CacheBuilder#build()}, but one that does not
+ * exhibit Guava issue #1881 :
+ * a cache inspection with {@link #getIfPresent(Object)} or {@link #get(Object, Callable)}
+ * is guaranteed to return fresh state after {@link #invalidate(Object)},
+ * {@link #invalidateAll(Iterable)} or {@link #invalidateAll()} were called.
+ *
+ * @see EvictableCacheBuilder
+ */
+// @ElementTypesAreNonnullByDefault
+class EvictableCache
+ extends AbstractLoadingCache
+ implements LoadingCache {
+ // Invariant: for every (K, token) entry in the tokens map, there is a live
+ // cache entry (token, ?) in dataCache, that, upon eviction, will cause the tokens'
+ // entry to be removed.
+ private final ConcurrentHashMap> tokens = new ConcurrentHashMap<>();
+ // The dataCache can have entries with no corresponding tokens in the tokens map.
+ // For example, this can happen when invalidation concurs with load.
+ // The dataCache must be bounded.
+ private final LoadingCache, V> dataCache;
+
+ private final AtomicInteger invalidations = new AtomicInteger();
+
+ EvictableCache(CacheBuilder super Token, ? super V> cacheBuilder, CacheLoader super K, V> cacheLoader) {
+ dataCache = buildUnsafeCache(
+ cacheBuilder
+ ., V>removalListener(removal -> {
+ Token token = removal.getKey();
+ Verify.verify(token != null, "token is null");
+ if (removal.getCause() != RemovalCause.REPLACED) {
+ tokens.remove(token.getKey(), token);
+ }
+ }),
+ new TokenCacheLoader<>(cacheLoader));
+ }
+
+ // @SuppressModernizer // CacheBuilder.build(CacheLoader) is forbidden,
+ // advising to use this class as a safety-adding wrapper.
+ private static LoadingCache buildUnsafeCache(CacheBuilder super K, ? super V> cacheBuilder,
+ CacheLoader super K, V> cacheLoader) {
+ return cacheBuilder.build(cacheLoader);
+ }
+
+ @Override
+ public V getIfPresent(Object key) {
+ @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K
+ Token token = tokens.get(key);
+ if (token == null) {
+ return null;
+ }
+ return dataCache.getIfPresent(token);
+ }
+
+ @Override
+ public V get(K key, Callable extends V> valueLoader)
+ throws ExecutionException {
+ Token newToken = new Token<>(key);
+ int invalidations = this.invalidations.get();
+ Token token = tokens.computeIfAbsent(key, ignored -> newToken);
+ try {
+ V value = dataCache.get(token, valueLoader);
+ if (invalidations == this.invalidations.get()) {
+ // Revive token if it got expired before reloading
+ if (tokens.putIfAbsent(key, token) == null) {
+ // Revived
+ if (!dataCache.asMap().containsKey(token)) {
+ // We revived, but the token does not correspond to a live entry anymore.
+ // It would stay in tokens forever, so let's remove it.
+ tokens.remove(key, token);
+ }
+ }
+ }
+ return value;
+ } catch (Throwable e) {
+ if (newToken == token) {
+ // Failed to load and it was our new token persisted in tokens map.
+ // No cache entry exists for the token (unless concurrent load happened),
+ // so we need to remove it.
+ tokens.remove(key, newToken);
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public V get(K key)
+ throws ExecutionException {
+ Token newToken = new Token<>(key);
+ int invalidations = this.invalidations.get();
+ Token token = tokens.computeIfAbsent(key, ignored -> newToken);
+ try {
+ V value = dataCache.get(token);
+ if (invalidations == this.invalidations.get()) {
+ // Revive token if it got expired before reloading
+ if (tokens.putIfAbsent(key, token) == null) {
+ // Revived
+ if (!dataCache.asMap().containsKey(token)) {
+ // We revived, but the token does not correspond to a live entry anymore.
+ // It would stay in tokens forever, so let's remove it.
+ tokens.remove(key, token);
+ }
+ }
+ }
+ return value;
+ } catch (Throwable e) {
+ if (newToken == token) {
+ // Failed to load and it was our new token persisted in tokens map.
+ // No cache entry exists for the token (unless concurrent load happened),
+ // so we need to remove it.
+ tokens.remove(key, newToken);
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public ImmutableMap getAll(Iterable extends K> keys)
+ throws ExecutionException {
+ List> newTokens = new ArrayList<>();
+ List> temporaryTokens = new ArrayList<>();
+ try {
+ Map result = new LinkedHashMap<>();
+ for (K key : keys) {
+ if (result.containsKey(key)) {
+ continue;
+ }
+ // This is not bulk, but is fast local operation
+ Token newToken = new Token<>(key);
+ Token oldToken = tokens.putIfAbsent(key, newToken);
+ if (oldToken != null) {
+ // Token exists but a data may not exist (e.g. due to concurrent eviction)
+ V value = dataCache.getIfPresent(oldToken);
+ if (value != null) {
+ result.put(key, value);
+ continue;
+ }
+ // Old token exists but value wasn't found. This can happen when there is concurrent
+ // eviction/invalidation or when the value is still being loaded.
+ // The new token is not registered in tokens, so won't be used by subsequent invocations.
+ temporaryTokens.add(newToken);
+ }
+ newTokens.add(newToken);
+ }
+
+ Map, V> values = dataCache.getAll(newTokens);
+ for (Map.Entry, V> entry : values.entrySet()) {
+ Token newToken = entry.getKey();
+ result.put(newToken.getKey(), entry.getValue());
+ }
+ return ImmutableMap.copyOf(result);
+ } catch (Throwable e) {
+ for (Token token : newTokens) {
+ // Failed to load and it was our new token (potentially) persisted in tokens map.
+ // No cache entry exists for the token (unless concurrent load happened),
+ // so we need to remove it.
+ tokens.remove(token.getKey(), token);
+ }
+ throw e;
+ } finally {
+ dataCache.invalidateAll(temporaryTokens);
+ }
+ }
+
+ @Override
+ public void refresh(K key) {
+ // The refresh loads a new entry, if it wasn't in the cache yet. Thus, we would create a new Token.
+ // However, dataCache.refresh is asynchronous and may fail, so no cache entry may be created.
+ // In such case we would leak the newly created token.
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long size() {
+ return dataCache.size();
+ }
+
+ @Override
+ public void cleanUp() {
+ dataCache.cleanUp();
+ }
+
+ @VisibleForTesting
+ int tokensCount() {
+ return tokens.size();
+ }
+
+ @Override
+ public void invalidate(Object key) {
+ invalidations.incrementAndGet();
+ @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K
+ Token token = tokens.remove(key);
+ if (token != null) {
+ dataCache.invalidate(token);
+ }
+ }
+
+ @Override
+ public void invalidateAll() {
+ invalidations.incrementAndGet();
+ dataCache.invalidateAll();
+ tokens.clear();
+ }
+
+ // Not thread safe, test only.
+ @VisibleForTesting
+ void clearDataCacheOnly() {
+ Map> tokensCopy = new HashMap<>(tokens);
+ dataCache.asMap().clear();
+ Verify.verify(tokens.isEmpty(), "Clearing dataCache should trigger tokens eviction");
+ tokens.putAll(tokensCopy);
+ }
+
+ @Override
+ public CacheStats stats() {
+ return dataCache.stats();
+ }
+
+ @Override
+ public ConcurrentMap asMap() {
+ return new ConcurrentMap() {
+ private final ConcurrentMap, V> dataCacheMap = dataCache.asMap();
+
+ @Override
+ public V putIfAbsent(K key, V value) {
+ throw new UnsupportedOperationException("The operation is not supported,"
+ + " as in inherently races with cache invalidation");
+ }
+
+ @Override
+ public V compute(K key, BiFunction super K, ? super V, ? extends V> remappingFunction) {
+ // default implementation of ConcurrentMap#compute uses not supported putIfAbsent in some cases
+ throw new UnsupportedOperationException("The operation is not supported, as in inherently"
+ + " races with cache invalidation");
+ }
+
+ @Override
+ public boolean remove(Object key, Object value) {
+ @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K
+ Token token = tokens.get(key);
+ if (token != null) {
+ return dataCacheMap.remove(token, value);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean replace(K key, V oldValue, V newValue) {
+ Token token = tokens.get(key);
+ if (token != null) {
+ return dataCacheMap.replace(token, oldValue, newValue);
+ }
+ return false;
+ }
+
+ @Override
+ public V replace(K key, V value) {
+ throw new UnsupportedOperationException("The operation is not supported, as in inherently races"
+ + " with cache invalidation");
+ }
+
+ @Override
+ public int size() {
+ return dataCache.asMap().size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return dataCache.asMap().isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return tokens.containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return values().contains(value);
+ }
+
+ @Override
+ @Nullable
+ public V get(Object key) {
+ return getIfPresent(key);
+ }
+
+ @Override
+ public V put(K key, V value) {
+ throw new UnsupportedOperationException("The operation is not supported, as in inherently"
+ + " races with cache invalidation. Use get(key, callable) instead.");
+ }
+
+ @Override
+ @Nullable
+ public V remove(Object key) {
+ Token token = tokens.remove(key);
+ if (token != null) {
+ return dataCacheMap.remove(token);
+ }
+ return null;
+ }
+
+ @Override
+ public void putAll(Map extends K, ? extends V> m) {
+ throw new UnsupportedOperationException("The operation is not supported, as in inherently"
+ + " races with cache invalidation. Use get(key, callable) instead.");
+ }
+
+ @Override
+ public void clear() {
+ dataCacheMap.clear();
+ tokens.clear();
+ }
+
+ @Override
+ public Set keySet() {
+ return tokens.keySet();
+ }
+
+ @Override
+ public Collection values() {
+ return dataCacheMap.values();
+ }
+
+ @Override
+ public Set> entrySet() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ // instance-based equality
+ static final class Token {
+ private final K key;
+
+ Token(K key) {
+ this.key = Objects.requireNonNull(key, "key is null");
+ }
+
+ K getKey() {
+ return key;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("CacheToken(%s; %s)", Integer.toHexString(hashCode()), key);
+ }
+ }
+
+ private static class TokenCacheLoader
+ extends CacheLoader, V> {
+ private final CacheLoader super K, V> delegate;
+
+ public TokenCacheLoader(CacheLoader super K, V> delegate) {
+ this.delegate = Objects.requireNonNull(delegate, "delegate is null");
+ }
+
+ @Override
+ public V load(Token token)
+ throws Exception {
+ return delegate.load(token.getKey());
+ }
+
+ @Override
+ public ListenableFuture reload(Token token, V oldValue)
+ throws Exception {
+ return delegate.reload(token.getKey(), oldValue);
+ }
+
+ @Override
+ public Map, V> loadAll(Iterable extends Token> tokens)
+ throws Exception {
+ List> tokenList = ImmutableList.copyOf(tokens);
+ List keys = new ArrayList<>();
+ for (Token token : tokenList) {
+ keys.add(token.getKey());
+ }
+ Map super K, V> values;
+ try {
+ values = delegate.loadAll(keys);
+ } catch (UnsupportedLoadingOperationException e) {
+ // Guava uses UnsupportedLoadingOperationException in LoadingCache.loadAll
+ // to fall back from bulk loading (without load sharing) to loading individual
+ // values (with load sharing). EvictableCache implementation does not currently
+ // support the fallback mechanism, so the individual values would be loaded
+ // without load sharing. This would be an unintentional and non-obvious behavioral
+ // discrepancy between EvictableCache and Guava Caches, so the mechanism is disabled.
+ throw new UnsupportedOperationException("LoadingCache.getAll() is not supported by EvictableCache"
+ + " when CacheLoader.loadAll is not implemented", e);
+ }
+
+ ImmutableMap.Builder, V> result = ImmutableMap.builder();
+ for (int i = 0; i < tokenList.size(); i++) {
+ Token token = tokenList.get(i);
+ K key = keys.get(i);
+ V value = values.get(key);
+ // CacheLoader.loadAll is not guaranteed to return values for all the keys
+ if (value != null) {
+ result.put(token, value);
+ }
+ }
+ return result.buildOrThrow();
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .addValue(delegate)
+ .toString();
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCacheBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCacheBuilder.java
new file mode 100644
index 000000000000000..8da3f8c8d5602eb
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCacheBuilder.java
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EvictableCacheBuilder.java
+// and modified by Doris
+
+package org.apache.doris.common;
+
+import org.apache.doris.common.EvictableCache.Token;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.Weigher;
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import com.google.errorprone.annotations.CheckReturnValue;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Builder for {@link Cache} and {@link LoadingCache} instances, similar to {@link CacheBuilder},
+ * but creating cache implementations that do not exhibit
+ * Guava issue #1881 :
+ * a cache inspection with {@link Cache#getIfPresent(Object)} or {@link Cache#get(Object, Callable)}
+ * is guaranteed to return fresh state after {@link Cache#invalidate(Object)},
+ * {@link Cache#invalidateAll(Iterable)} or {@link Cache#invalidateAll()} were called.
+ */
+public final class EvictableCacheBuilder {
+ @CheckReturnValue
+ public static EvictableCacheBuilder newBuilder() {
+ return new EvictableCacheBuilder<>();
+ }
+
+ private Optional ticker = Optional.empty();
+ private Optional expireAfterWrite = Optional.empty();
+ private Optional refreshAfterWrite = Optional.empty();
+ private Optional maximumSize = Optional.empty();
+ private Optional maximumWeight = Optional.empty();
+ private Optional concurrencyLevel = Optional.empty();
+ private Optional, ? super V>> weigher = Optional.empty();
+ private boolean recordStats;
+ private Optional disabledCacheImplementation = Optional.empty();
+
+ private EvictableCacheBuilder() {}
+
+ /**
+ * Pass-through for {@link CacheBuilder#ticker(Ticker)}.
+ */
+ @CanIgnoreReturnValue
+ public EvictableCacheBuilder ticker(Ticker ticker) {
+ this.ticker = Optional.of(ticker);
+ return this;
+ }
+
+ @CanIgnoreReturnValue
+ public EvictableCacheBuilder expireAfterWrite(long duration, TimeUnit unit) {
+ return expireAfterWrite(toDuration(duration, unit));
+ }
+
+ @CanIgnoreReturnValue
+ public EvictableCacheBuilder expireAfterWrite(Duration duration) {
+ Preconditions.checkState(!this.expireAfterWrite.isPresent(), "expireAfterWrite already set");
+ this.expireAfterWrite = Optional.of(duration);
+ return this;
+ }
+
+ @CanIgnoreReturnValue
+ public EvictableCacheBuilder refreshAfterWrite(long duration, TimeUnit unit) {
+ return refreshAfterWrite(toDuration(duration, unit));
+ }
+
+ @CanIgnoreReturnValue
+ public EvictableCacheBuilder refreshAfterWrite(Duration duration) {
+ Preconditions.checkState(!this.refreshAfterWrite.isPresent(), "refreshAfterWrite already set");
+ this.refreshAfterWrite = Optional.of(duration);
+ return this;
+ }
+
+ @CanIgnoreReturnValue
+ public EvictableCacheBuilder maximumSize(long maximumSize) {
+ Preconditions.checkState(!this.maximumSize.isPresent(), "maximumSize already set");
+ Preconditions.checkState(!this.maximumWeight.isPresent(), "maximumWeight already set");
+ this.maximumSize = Optional.of(maximumSize);
+ return this;
+ }
+
+ @CanIgnoreReturnValue
+ public EvictableCacheBuilder maximumWeight(long maximumWeight) {
+ Preconditions.checkState(!this.maximumWeight.isPresent(), "maximumWeight already set");
+ Preconditions.checkState(!this.maximumSize.isPresent(), "maximumSize already set");
+ this.maximumWeight = Optional.of(maximumWeight);
+ return this;
+ }
+
+ @CanIgnoreReturnValue
+ public EvictableCacheBuilder concurrencyLevel(int concurrencyLevel) {
+ Preconditions.checkState(!this.concurrencyLevel.isPresent(), "concurrencyLevel already set");
+ this.concurrencyLevel = Optional.of(concurrencyLevel);
+ return this;
+ }
+
+ public EvictableCacheBuilder weigher(Weigher super K1, ? super V1> weigher) {
+ Preconditions.checkState(!this.weigher.isPresent(), "weigher already set");
+ @SuppressWarnings("unchecked") // see com.google.common.cache.CacheBuilder.weigher
+ EvictableCacheBuilder cast = (EvictableCacheBuilder) this;
+ cast.weigher = Optional.of(new TokenWeigher<>(weigher));
+ return cast;
+ }
+
+ @CanIgnoreReturnValue
+ public EvictableCacheBuilder recordStats() {
+ recordStats = true;
+ return this;
+ }
+
+ /**
+ * Choose a behavior for case when caching is disabled that may allow data and failure
+ * sharing between concurrent callers.
+ */
+ @CanIgnoreReturnValue
+ public EvictableCacheBuilder shareResultsAndFailuresEvenIfDisabled() {
+ return disabledCacheImplementation(DisabledCacheImplementation.GUAVA);
+ }
+
+ /**
+ * Choose a behavior for case when caching is disabled that prevents data and
+ * failure sharing between concurrent callers.
+ * Note: disabled cache won't report any statistics.
+ */
+ @CanIgnoreReturnValue
+ public EvictableCacheBuilder shareNothingWhenDisabled() {
+ return disabledCacheImplementation(DisabledCacheImplementation.NOOP);
+ }
+
+ @VisibleForTesting
+ EvictableCacheBuilder disabledCacheImplementation(DisabledCacheImplementation cacheImplementation) {
+ Preconditions.checkState(!disabledCacheImplementation.isPresent(), "disabledCacheImplementation already set");
+ disabledCacheImplementation = Optional.of(cacheImplementation);
+ return this;
+ }
+
+ @CheckReturnValue
+ public Cache build() {
+ return build(unimplementedCacheLoader());
+ }
+
+ @CheckReturnValue
+ public LoadingCache build(CacheLoader super K1, V1> loader) {
+ if (cacheDisabled()) {
+ // Silently providing a behavior different from Guava's could be surprising, so require explicit choice.
+ DisabledCacheImplementation disabledCacheImplementation = this.disabledCacheImplementation.orElseThrow(
+ () -> new IllegalStateException(
+ "Even when cache is disabled, the loads are synchronized and both load results and failures"
+ + " are shared between threads. " + "This is rarely desired, thus builder caller is"
+ + " expected to either opt-in into this behavior with"
+ + " shareResultsAndFailuresEvenIfDisabled(), or choose not to share results (and failures)"
+ + " between concurrent invocations with shareNothingWhenDisabled()."));
+
+ switch (disabledCacheImplementation) {
+ case NOOP:
+ return new EmptyCache<>(loader, recordStats);
+ case GUAVA: {
+ // Disabled cache is always empty, so doesn't exhibit invalidation problems.
+ // Avoid overhead of EvictableCache wrapper.
+ CacheBuilder cacheBuilder = CacheBuilder.newBuilder()
+ .maximumSize(0)
+ .expireAfterWrite(0, TimeUnit.SECONDS);
+ if (recordStats) {
+ cacheBuilder.recordStats();
+ }
+ return buildUnsafeCache(cacheBuilder, loader);
+ }
+ default:
+ throw new IllegalStateException("Unexpected value: " + disabledCacheImplementation);
+ }
+ }
+
+ if (!(maximumSize.isPresent() || maximumWeight.isPresent() || expireAfterWrite.isPresent())) {
+ // EvictableCache invalidation (e.g. invalidateAll) happening concurrently with a load may
+ // lead to an entry remaining in the cache, without associated token. This would lead to
+ // a memory leak in an unbounded cache.
+ throw new IllegalStateException("Unbounded cache is not supported");
+ }
+
+ // CacheBuilder is further modified in EvictableCache::new, so cannot be shared between build() calls.
+ CacheBuilder cacheBuilder = CacheBuilder.newBuilder();
+ ticker.ifPresent(cacheBuilder::ticker);
+ expireAfterWrite.ifPresent(cacheBuilder::expireAfterWrite);
+ refreshAfterWrite.ifPresent(cacheBuilder::refreshAfterWrite);
+ maximumSize.ifPresent(cacheBuilder::maximumSize);
+ maximumWeight.ifPresent(cacheBuilder::maximumWeight);
+ weigher.ifPresent(cacheBuilder::weigher);
+ concurrencyLevel.ifPresent(cacheBuilder::concurrencyLevel);
+ if (recordStats) {
+ cacheBuilder.recordStats();
+ }
+ return new EvictableCache<>(cacheBuilder, loader);
+ }
+
+ private boolean cacheDisabled() {
+ return (maximumSize.isPresent() && maximumSize.get() == 0)
+ || (expireAfterWrite.isPresent() && expireAfterWrite.get().isZero());
+ }
+
+ // @SuppressModernizer // CacheBuilder.build(CacheLoader) is forbidden,
+ // advising to use this class as a safety-adding wrapper.
+ private static LoadingCache buildUnsafeCache(CacheBuilder super K, ? super V> cacheBuilder,
+ CacheLoader super K, V> cacheLoader) {
+ return cacheBuilder.build(cacheLoader);
+ }
+
+ private static CacheLoader unimplementedCacheLoader() {
+ return CacheLoader.from(ignored -> {
+ throw new UnsupportedOperationException();
+ });
+ }
+
+ private static final class TokenWeigher
+ implements Weigher, V> {
+ private final Weigher super K, ? super V> delegate;
+
+ private TokenWeigher(Weigher super K, ? super V> delegate) {
+ this.delegate = Objects.requireNonNull(delegate, "delegate is null");
+ }
+
+ @Override
+ public int weigh(Token key, V value) {
+ return delegate.weigh(key.getKey(), value);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TokenWeigher, ?> that = (TokenWeigher, ?>) o;
+ return Objects.equals(delegate, that.delegate);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(delegate);
+ }
+
+ @Override
+ public String toString() {
+ return "TokenWeigher{" + "delegate=" + delegate + '}';
+ }
+ }
+
+ private static Duration toDuration(long duration, TimeUnit unit) {
+ // Saturated conversion, as in com.google.common.cache.CacheBuilder.toNanosSaturated
+ return Duration.ofNanos(unit.toNanos(duration));
+ }
+
+ @VisibleForTesting
+ enum DisabledCacheImplementation {
+ NOOP,
+ GUAVA,
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
index a0558766e814002..9fa502be42b962c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
@@ -51,7 +51,7 @@ public ExternalSchemaCache(ExternalCatalog catalog, ExecutorService executor) {
}
private void init(ExecutorService executor) {
- CacheFactory schemaCacheFactory = new CacheFactory(
+ CacheFactory> schemaCacheFactory = new CacheFactory(
OptionalLong.of(86400L),
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60),
Config.max_external_schema_cache_num,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index aacd9268ae35cfc..1d652c1e1103503 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -33,6 +33,7 @@
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.fs.FileSystemDirectoryLister;
import org.apache.doris.mtmv.MTMVBaseTableIf;
import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
import org.apache.doris.mtmv.MTMVRefreshContext;
@@ -952,7 +953,8 @@ private List getFilesForPartitions(
LOG.debug("Chosen partition for table {}. [{}]", name, partition.toString());
}
}
- return cache.getFilesByPartitionsWithoutCache(hivePartitions, bindBrokerName);
+ return cache.getFilesByPartitionsWithoutCache(hivePartitions, bindBrokerName,
+ new FileSystemDirectoryLister(), null);
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index ea42dfa2f52a01d..765a1920fe66fbd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -25,6 +25,7 @@
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CacheFactory;
@@ -39,7 +40,10 @@
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.datasource.property.PropertyConverter;
+import org.apache.doris.fs.DirectoryLister;
import org.apache.doris.fs.FileSystemCache;
+import org.apache.doris.fs.FileSystemDirectoryLister;
+import org.apache.doris.fs.RemoteIterator;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
@@ -81,6 +85,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.io.FileNotFoundException;
+import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@@ -138,7 +144,7 @@ public HiveMetaStoreCache(HMSExternalCatalog catalog,
* which will bring out thread deadlock.
**/
private void init() {
- CacheFactory partitionValuesCacheFactory = new CacheFactory(
+ CacheFactory partitionValuesCacheFactory = new CacheFactory(
OptionalLong.of(28800L),
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L),
Config.max_hive_partition_table_cache_num,
@@ -195,7 +201,7 @@ protected ExecutorService getExecutor() {
@Override
public FileCacheValue load(FileCacheKey key) {
- return loadFiles(key);
+ return loadFiles(key, new FileSystemDirectoryLister(), null);
}
};
@@ -348,7 +354,9 @@ private Map loadPartitions(Iterable extends
private FileCacheValue getFileCache(String location, String inputFormat,
JobConf jobConf,
List partitionValues,
- String bindBrokerName) throws UserException {
+ String bindBrokerName,
+ DirectoryLister directoryLister,
+ TableIf table) throws UserException {
FileCacheValue result = new FileCacheValue();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
@@ -363,17 +371,18 @@ private FileCacheValue getFileCache(String location, String inputFormat,
// /user/hive/warehouse/region_tmp_union_all2/2
// So we need to recursively list data location.
// https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
- List remoteFiles = new ArrayList<>();
boolean isRecursiveDirectories = Boolean.valueOf(
catalog.getProperties().getOrDefault("hive.recursive_directories", "false"));
- Status status = fs.listFiles(location, isRecursiveDirectories, remoteFiles);
- if (status.ok()) {
- for (RemoteFile remoteFile : remoteFiles) {
+ try {
+ RemoteIterator iterator = directoryLister.listFiles(fs, isRecursiveDirectories,
+ table, location);
+ while (iterator.hasNext()) {
+ RemoteFile remoteFile = iterator.next();
String srcPath = remoteFile.getPath().toString();
LocationPath locationPath = new LocationPath(srcPath, catalog.getProperties());
result.addFile(remoteFile, locationPath);
}
- } else if (status.getErrCode().equals(ErrCode.NOT_FOUND)) {
+ } catch (FileNotFoundException e) {
// User may manually remove partition under HDFS, in this case,
// Hive doesn't aware that the removed partition is missing.
// Here is to support this case without throw an exception.
@@ -382,15 +391,15 @@ private FileCacheValue getFileCache(String location, String inputFormat,
.getOrDefault("hive.ignore_absent_partitions", "true"))) {
throw new UserException("Partition location does not exist: " + location);
}
- } else {
- throw new RuntimeException(status.getErrMsg());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
// Must copy the partitionValues to avoid concurrent modification of key and value
result.setPartitionValues(Lists.newArrayList(partitionValues));
return result;
}
- private FileCacheValue loadFiles(FileCacheKey key) {
+ private FileCacheValue loadFiles(FileCacheKey key, DirectoryLister directoryLister, TableIf table) {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
@@ -415,7 +424,7 @@ private FileCacheValue loadFiles(FileCacheKey key) {
FileInputFormat.setInputPaths(jobConf, finalLocation.get());
try {
FileCacheValue result = getFileCache(finalLocation.get(), key.inputFormat, jobConf,
- key.getPartitionValues(), key.bindBrokerName);
+ key.getPartitionValues(), key.bindBrokerName, directoryLister, table);
// Replace default hive partition with a null_string.
for (int i = 0; i < result.getValuesSize(); i++) {
if (HIVE_DEFAULT_PARTITION.equals(result.getPartitionValues().get(i))) {
@@ -469,19 +478,25 @@ public HivePartitionValues getPartitionValues(PartitionValueCacheKey key) {
}
public List getFilesByPartitionsWithCache(List partitions,
- String bindBrokerName) {
- return getFilesByPartitions(partitions, true, true, bindBrokerName);
+ String bindBrokerName,
+ DirectoryLister directoryLister,
+ TableIf table) {
+ return getFilesByPartitions(partitions, true, true, bindBrokerName, directoryLister, table);
}
public List getFilesByPartitionsWithoutCache(List partitions,
- String bindBrokerName) {
- return getFilesByPartitions(partitions, false, true, bindBrokerName);
+ String bindBrokerName,
+ DirectoryLister directoryLister,
+ TableIf table) {
+ return getFilesByPartitions(partitions, false, true, bindBrokerName, directoryLister, table);
}
public List getFilesByPartitions(List partitions,
boolean withCache,
boolean concurrent,
- String bindBrokerName) {
+ String bindBrokerName,
+ DirectoryLister directoryLister,
+ TableIf table) {
long start = System.currentTimeMillis();
List keys = partitions.stream().map(p -> p.isDummyPartition()
? FileCacheKey.createDummyCacheKey(
@@ -497,13 +512,15 @@ public List getFilesByPartitions(List partitions,
} else {
if (concurrent) {
List> pList = keys.stream().map(
- key -> fileListingExecutor.submit(() -> loadFiles(key))).collect(Collectors.toList());
+ key -> fileListingExecutor.submit(() -> loadFiles(key, directoryLister, table)))
+ .collect(Collectors.toList());
fileLists = Lists.newArrayListWithExpectedSize(keys.size());
for (Future p : pList) {
fileLists.add(p.get());
}
} else {
- fileLists = keys.stream().map(this::loadFiles).collect(Collectors.toList());
+ fileLists = keys.stream().map((key) -> loadFiles(key, directoryLister, table))
+ .collect(Collectors.toList());
}
}
} catch (ExecutionException e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index e710bdb935d7bc7..2af13e547fd016d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -41,6 +41,7 @@
import org.apache.doris.datasource.hive.HiveProperties;
import org.apache.doris.datasource.hive.HiveTransaction;
import org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator;
+import org.apache.doris.fs.DirectoryLister;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
@@ -84,6 +85,8 @@ public class HiveScanNode extends FileQueryScanNode {
@Setter
private SelectedPartitions selectedPartitions = null;
+ private DirectoryLister directoryLister;
+
private boolean partitionInit = false;
private final AtomicReference batchException = new AtomicReference<>(null);
private List prunedPartitions;
@@ -98,17 +101,21 @@ public class HiveScanNode extends FileQueryScanNode {
* eg: s3 tvf
* These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check
*/
- public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
+ public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv,
+ DirectoryLister directoryLister) {
super(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, needCheckColumnPriv);
hmsTable = (HMSExternalTable) desc.getTable();
brokerName = hmsTable.getCatalog().bindBrokerName();
+ this.directoryLister = directoryLister;
}
public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
- StatisticalType statisticalType, boolean needCheckColumnPriv) {
+ StatisticalType statisticalType, boolean needCheckColumnPriv,
+ DirectoryLister directoryLister) {
super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
hmsTable = (HMSExternalTable) desc.getTable();
brokerName = hmsTable.getCatalog().bindBrokerName();
+ this.directoryLister = directoryLister;
}
@Override
@@ -269,7 +276,8 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List 0;
- fileCaches = cache.getFilesByPartitions(partitions, withCache, partitions.size() > 1, bindBrokerName);
+ fileCaches = cache.getFilesByPartitions(partitions, withCache, partitions.size() > 1, bindBrokerName,
+ directoryLister, hmsTable);
}
if (tableSample != null) {
List hiveFileStatuses = selectFiles(fileCaches);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index a8f2a362bfde8d0..3f7870ff2e3c0b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -34,6 +34,7 @@
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hudi.HudiUtils;
+import org.apache.doris.fs.DirectoryLister;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
@@ -121,8 +122,9 @@ public class HudiScanNode extends HiveScanNode {
* These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check
*/
public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv,
- Optional scanParams, Optional incrementalRelation) {
- super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv);
+ Optional scanParams, Optional incrementalRelation,
+ DirectoryLister directoryLister) {
+ super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv, directoryLister);
isCowOrRoTable = hmsTable.isHoodieCowTable();
if (LOG.isDebugEnabled()) {
if (isCowOrRoTable) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index c1ac2a79754b796..50aad3e9db74320 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -53,7 +53,7 @@ public class IcebergMetadataCache {
private final LoadingCache tableCache;
public IcebergMetadataCache(ExecutorService executor) {
- CacheFactory snapshotListCacheFactory = new CacheFactory(
+ CacheFactory> snapshotListCacheFactory = new CacheFactory(
OptionalLong.of(28800L),
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60),
Config.max_external_table_cache_num,
@@ -61,7 +61,7 @@ public IcebergMetadataCache(ExecutorService executor) {
null);
this.snapshotListCache = snapshotListCacheFactory.buildCache(key -> loadSnapshots(key), null, executor);
- CacheFactory tableCacheFactory = new CacheFactory(
+ CacheFactory tableCacheFactory = new CacheFactory(
OptionalLong.of(28800L),
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60),
Config.max_external_table_cache_num,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java b/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java
new file mode 100644
index 000000000000000..765c5ab6863d44e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java
+// and modified by Doris
+
+package org.apache.doris.fs;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.fs.remote.RemoteFile;
+
+import java.io.IOException;
+
+public interface DirectoryLister {
+ RemoteIterator listFiles(FileSystem fs, boolean recursive, TableIf table, String location)
+ throws IOException;
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
index e96258dc719fbd2..80a76dc4eda11f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
@@ -36,7 +36,7 @@ public class FileSystemCache {
public FileSystemCache() {
// no need to set refreshAfterWrite, because the FileSystem is created once and never changed
- CacheFactory fsCacheFactory = new CacheFactory(
+ CacheFactory fsCacheFactory = new CacheFactory<>(
OptionalLong.of(86400L),
OptionalLong.empty(),
Config.max_remote_file_system_cache_num,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java
new file mode 100644
index 000000000000000..ae8371cdbfa0ea6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs;
+
+import org.apache.doris.backup.Status;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.fs.remote.RemoteFile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FileSystemDirectoryLister implements DirectoryLister {
+ public RemoteIterator listFiles(FileSystem fs, boolean recursive, TableIf table, String location)
+ throws IOException {
+ List result = new ArrayList<>();
+ Status status = fs.listFiles(location, recursive, result);
+ if (!status.ok()) {
+ throw new IOException(status.getErrMsg());
+ }
+ return new RemoteFileRemoteIterator(result);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java
new file mode 100644
index 000000000000000..2cc4b42557aa4e6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs;
+
+import org.apache.doris.fs.remote.RemoteFile;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+public class RemoteFileRemoteIterator
+ implements RemoteIterator {
+ private final List remoteFileList;
+ private int currentIndex = 0;
+
+ public RemoteFileRemoteIterator(List remoteFileList) {
+ this.remoteFileList = Objects.requireNonNull(remoteFileList, "iterator is null");
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return currentIndex < remoteFileList.size();
+ }
+
+ @Override
+ public RemoteFile next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more elements in RemoteFileRemoteIterator");
+ }
+ return remoteFileList.get(currentIndex++);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java
new file mode 100644
index 000000000000000..b398cbc1d3fd4f7
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/RemoteIterator.java
+// and modified by Doris
+
+package org.apache.doris.fs;
+
+import java.io.IOException;
+
+public interface RemoteIterator {
+ boolean hasNext() throws IOException;
+
+ T next() throws IOException;
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java b/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java
new file mode 100644
index 000000000000000..b19130dfbb48eef
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.fs;
+
+import org.apache.doris.fs.remote.RemoteFile;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Objects;
+// This file is copied from
+// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/SimpleRemoteIterator.java
+// and modified by Doris
+
+class SimpleRemoteIterator implements RemoteIterator {
+ private final Iterator iterator;
+
+ public SimpleRemoteIterator(Iterator iterator) {
+ this.iterator = Objects.requireNonNull(iterator, "iterator is null");
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public RemoteFile next() throws IOException {
+ return iterator.next();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java
new file mode 100644
index 000000000000000..6be3c03f824d048
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java
+// and modified by Doris
+
+package org.apache.doris.fs;
+
+import java.util.Objects;
+
+public class TransactionDirectoryListingCacheKey {
+
+ private final long transactionId;
+ private final String path;
+
+ public TransactionDirectoryListingCacheKey(long transactionId, String path) {
+ this.transactionId = transactionId;
+ this.path = Objects.requireNonNull(path, "path is null");
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TransactionDirectoryListingCacheKey that = (TransactionDirectoryListingCacheKey) o;
+ return transactionId == that.transactionId && path.equals(that.path);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(transactionId, path);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("TransactionDirectoryListingCacheKey{");
+ sb.append("transactionId=").append(transactionId);
+ sb.append(", path='").append(path).append('\'');
+ sb.append('}');
+ return sb.toString();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java
new file mode 100644
index 000000000000000..841cf61301798c4
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java
+// and modified by Doris
+
+package org.apache.doris.fs;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.fs.remote.RemoteFile;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.commons.collections.ListUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+
+/**
+ * Caches directory content (including listings that were started concurrently).
+ * {@link TransactionScopeCachingDirectoryLister} assumes that all listings
+ * are performed by same user within single transaction, therefore any failure can
+ * be shared between concurrent listings.
+ */
+public class TransactionScopeCachingDirectoryLister implements DirectoryLister {
+ private final long transactionId;
+
+ @VisibleForTesting
+ public Cache getCache() {
+ return cache;
+ }
+
+ //TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys
+ // to deal more efficiently with cache invalidation scenarios for partitioned tables.
+ private final Cache cache;
+ private final DirectoryLister delegate;
+
+ public TransactionScopeCachingDirectoryLister(DirectoryLister delegate, long transactionId,
+ Cache cache) {
+ this.delegate = Objects.requireNonNull(delegate, "delegate is null");
+ this.transactionId = transactionId;
+ this.cache = Objects.requireNonNull(cache, "cache is null");
+ }
+
+ @Override
+ public RemoteIterator listFiles(FileSystem fs, boolean recursive, TableIf table, String location)
+ throws IOException {
+ return listInternal(fs, recursive, table, new TransactionDirectoryListingCacheKey(transactionId, location));
+ }
+
+ private RemoteIterator listInternal(FileSystem fs, boolean recursive, TableIf table,
+ TransactionDirectoryListingCacheKey cacheKey) throws IOException {
+ FetchingValueHolder cachedValueHolder;
+ try {
+ cachedValueHolder = cache.get(cacheKey,
+ () -> new FetchingValueHolder(createListingRemoteIterator(fs, recursive, table, cacheKey)));
+ } catch (ExecutionException | UncheckedExecutionException e) {
+ Throwable throwable = e.getCause();
+ Throwables.throwIfInstanceOf(throwable, IOException.class);
+ Throwables.throwIfUnchecked(throwable);
+ throw new RuntimeException("Failed to list directory: " + cacheKey.getPath(), throwable);
+ }
+
+ if (cachedValueHolder.isFullyCached()) {
+ return new SimpleRemoteIterator(cachedValueHolder.getCachedFiles());
+ }
+
+ return cachingRemoteIterator(cachedValueHolder, cacheKey);
+ }
+
+ private RemoteIterator createListingRemoteIterator(FileSystem fs, boolean recursive,
+ TableIf table, TransactionDirectoryListingCacheKey cacheKey)
+ throws IOException {
+ return delegate.listFiles(fs, recursive, table, cacheKey.getPath());
+ }
+
+
+ private RemoteIterator cachingRemoteIterator(FetchingValueHolder cachedValueHolder,
+ TransactionDirectoryListingCacheKey cacheKey) {
+ return new RemoteIterator() {
+ private int fileIndex;
+
+ @Override
+ public boolean hasNext()
+ throws IOException {
+ try {
+ boolean hasNext = cachedValueHolder.getCachedFile(fileIndex).isPresent();
+ // Update cache weight of cachedValueHolder for a given path.
+ // The cachedValueHolder acts as an invalidation guard.
+ // If a cache invalidation happens while this iterator goes over the files from the specified path,
+ // the eventually outdated file listing will not be added anymore to the cache.
+ cache.asMap().replace(cacheKey, cachedValueHolder, cachedValueHolder);
+ return hasNext;
+ } catch (Exception exception) {
+ // invalidate cached value to force retry of directory listing
+ cache.invalidate(cacheKey);
+ throw exception;
+ }
+ }
+
+ @Override
+ public RemoteFile next()
+ throws IOException {
+ // force cache entry weight update in case next file is cached
+ Preconditions.checkState(hasNext());
+ return cachedValueHolder.getCachedFile(fileIndex++).orElseThrow(NoSuchElementException::new);
+ }
+ };
+ }
+
+ @VisibleForTesting
+ boolean isCached(String location) {
+ return isCached(new TransactionDirectoryListingCacheKey(transactionId, location));
+ }
+
+ @VisibleForTesting
+ boolean isCached(TransactionDirectoryListingCacheKey cacheKey) {
+ FetchingValueHolder cached = cache.getIfPresent(cacheKey);
+ return cached != null && cached.isFullyCached();
+ }
+
+ static class FetchingValueHolder {
+
+ private final List cachedFiles = ListUtils.synchronizedList(new ArrayList());
+
+ @GuardedBy("this")
+ @Nullable
+ private RemoteIterator fileIterator;
+ @GuardedBy("this")
+ @Nullable
+ private Exception exception;
+
+ public FetchingValueHolder(RemoteIterator fileIterator) {
+ this.fileIterator = Objects.requireNonNull(fileIterator, "fileIterator is null");
+ }
+
+ public synchronized boolean isFullyCached() {
+ return fileIterator == null && exception == null;
+ }
+
+ public long getCacheFileCount() {
+ return cachedFiles.size();
+ }
+
+ public Iterator getCachedFiles() {
+ Preconditions.checkState(isFullyCached());
+ return cachedFiles.iterator();
+ }
+
+ public Optional getCachedFile(int index)
+ throws IOException {
+ int filesSize = cachedFiles.size();
+ Preconditions.checkArgument(index >= 0 && index <= filesSize,
+ "File index (%s) out of bounds [0, %s]", index, filesSize);
+
+ // avoid fileIterator synchronization (and thus blocking) for already cached files
+ if (index < filesSize) {
+ return Optional.of(cachedFiles.get(index));
+ }
+
+ return fetchNextCachedFile(index);
+ }
+
+ private synchronized Optional fetchNextCachedFile(int index)
+ throws IOException {
+ if (exception != null) {
+ throw new IOException("Exception while listing directory", exception);
+ }
+
+ if (index < cachedFiles.size()) {
+ // file was fetched concurrently
+ return Optional.of(cachedFiles.get(index));
+ }
+
+ try {
+ if (fileIterator == null || !fileIterator.hasNext()) {
+ // no more files
+ fileIterator = null;
+ return Optional.empty();
+ }
+
+ RemoteFile fileStatus = fileIterator.next();
+ cachedFiles.add(fileStatus);
+ return Optional.of(fileStatus);
+ } catch (Exception exception) {
+ fileIterator = null;
+ this.exception = exception;
+ throw exception;
+ }
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java
new file mode 100644
index 000000000000000..c3c9c347c3d2b65
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryListerFactory.java
+// and modified by Doris
+
+package org.apache.doris.fs;
+
+import org.apache.doris.common.EvictableCacheBuilder;
+import org.apache.doris.fs.TransactionScopeCachingDirectoryLister.FetchingValueHolder;
+
+import com.google.common.cache.Cache;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TransactionScopeCachingDirectoryListerFactory {
+ //TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys
+ // to deal more efficiently with cache invalidation scenarios for partitioned tables.
+ // private final Optional> cache;
+
+ private final Optional> cache;
+
+ private final AtomicLong nextTransactionId = new AtomicLong();
+
+ public TransactionScopeCachingDirectoryListerFactory(long maxSize) {
+ if (maxSize > 0) {
+ EvictableCacheBuilder cacheBuilder =
+ EvictableCacheBuilder.newBuilder()
+ .maximumWeight(maxSize)
+ .weigher((key, value) ->
+ Math.toIntExact(value.getCacheFileCount()));
+ this.cache = Optional.of(cacheBuilder.build());
+ } else {
+ cache = Optional.empty();
+ }
+ }
+
+ public DirectoryLister get(DirectoryLister delegate) {
+ return cache
+ .map(cache -> (DirectoryLister) new TransactionScopeCachingDirectoryLister(delegate,
+ nextTransactionId.getAndIncrement(), cache))
+ .orElse(delegate);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 654ccc8ca1155aa..cc8b8101be94d81 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -46,6 +46,7 @@
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
+import org.apache.doris.common.Config;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.es.EsExternalTable;
@@ -68,6 +69,9 @@
import org.apache.doris.datasource.paimon.source.PaimonScanNode;
import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable;
import org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode;
+import org.apache.doris.fs.DirectoryLister;
+import org.apache.doris.fs.FileSystemDirectoryLister;
+import org.apache.doris.fs.TransactionScopeCachingDirectoryListerFactory;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.DistributionSpec;
import org.apache.doris.nereids.properties.DistributionSpecAny;
@@ -241,6 +245,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor slots = fileScan.getOutput();
ExternalTable table = fileScan.getTable();
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context);
@@ -648,7 +664,7 @@ public PlanFragment visitPhysicalHudiScan(PhysicalHudiScan fileScan, PlanTransla
+ " for Hudi table");
PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan;
ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(), tupleDescriptor, false,
- hudiScan.getScanParams(), hudiScan.getIncrementalRelation());
+ hudiScan.getScanParams(), hudiScan.getIncrementalRelation(), directoryLister);
if (fileScan.getTableSnapshot().isPresent()) {
((FileQueryScanNode) scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index d94ad0a2552240f..f69c8ac79641448 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -58,6 +58,7 @@
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Reference;
@@ -74,6 +75,9 @@
import org.apache.doris.datasource.odbc.source.OdbcScanNode;
import org.apache.doris.datasource.paimon.source.PaimonScanNode;
import org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode;
+import org.apache.doris.fs.DirectoryLister;
+import org.apache.doris.fs.FileSystemDirectoryLister;
+import org.apache.doris.fs.TransactionScopeCachingDirectoryListerFactory;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.statistics.StatisticalType;
@@ -115,6 +119,8 @@ public class SingleNodePlanner {
private final ArrayList scanNodes = Lists.newArrayList();
private Map> selectStmtToScanNodes = Maps.newHashMap();
+ private DirectoryLister directoryLister;
+
public SingleNodePlanner(PlannerContext ctx) {
this.ctx = ctx;
}
@@ -1959,6 +1965,11 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s
scanNode = ((TableValuedFunctionRef) tblRef).getScanNode(ctx.getNextNodeId());
break;
case HMS_EXTERNAL_TABLE:
+ // TransactionScopeCachingDirectoryLister is only used in hms external tables.
+ if (directoryLister != null) {
+ this.directoryLister = new TransactionScopeCachingDirectoryListerFactory(
+ Config.max_external_table_split_file_meta_cache_num).get(new FileSystemDirectoryLister());
+ }
TableIf table = tblRef.getDesc().getTable();
switch (((HMSExternalTable) table).getDlaType()) {
case HUDI:
@@ -1968,14 +1979,15 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s
throw new UserException("Hudi incremental read is not supported, "
+ "please set enable_nereids_planner = true to enable new optimizer");
}
+
scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true,
- Optional.empty(), Optional.empty());
+ Optional.empty(), Optional.empty(), directoryLister);
break;
case ICEBERG:
scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
break;
case HIVE:
- scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
+ scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, directoryLister);
((HiveScanNode) scanNode).setTableSample(tblRef.getTableSample());
break;
default:
diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/TestEvictableCache.java b/fe/fe-core/src/test/java/org/apache/doris/common/TestEvictableCache.java
new file mode 100644
index 000000000000000..3bfdc73b78e358f
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/TestEvictableCache.java
@@ -0,0 +1,708 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/test/java/io/trino/cache/TestEvictableCache.java
+// and modified by Doris
+
+package org.apache.doris.common;
+
+import org.apache.doris.common.EvictableCacheBuilder.DisabledCacheImplementation;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheStats;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.gaul.modernizer_maven_annotations.SuppressModernizer;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+
+public class TestEvictableCache {
+ private static class TestingTicker extends Ticker {
+ private volatile long time;
+
+ public TestingTicker() {
+ }
+
+ public long read() {
+ return this.time;
+ }
+
+ public synchronized void increment(long delta, TimeUnit unit) {
+ Preconditions.checkArgument(delta >= 0L, "delta is negative");
+ this.time += unit.toNanos(delta);
+ }
+ }
+
+ private static final int TEST_TIMEOUT_SECONDS = 10;
+
+ @Test
+ @Timeout(TEST_TIMEOUT_SECONDS)
+ public void testLoad()
+ throws Exception {
+ Cache cache = EvictableCacheBuilder.newBuilder()
+ .maximumSize(10_000)
+ .build();
+ Assert.assertEquals("abc", cache.get(42, () -> "abc"));
+ }
+
+ @Test
+ @Timeout(TEST_TIMEOUT_SECONDS)
+ public void testEvictBySize()
+ throws Exception {
+ int maximumSize = 10;
+ Cache cache = EvictableCacheBuilder.newBuilder()
+ .maximumSize(maximumSize)
+ .build();
+
+ for (int i = 0; i < 10_000; i++) {
+ int value = i * 10;
+ Assert.assertEquals(value, (Object) cache.get(i, () -> value));
+ }
+ cache.cleanUp();
+ Assert.assertEquals(maximumSize, cache.size());
+ Assert.assertEquals(maximumSize, ((EvictableCache, ?>) cache).tokensCount());
+
+ // Ensure cache is effective, i.e. some entries preserved
+ int lastKey = 10_000 - 1;
+ Assert.assertEquals(lastKey * 10, (Object) cache.get(lastKey, () -> {
+ throw new UnsupportedOperationException();
+ }));
+ }
+
+ @Test
+ @Timeout(TEST_TIMEOUT_SECONDS)
+ public void testEvictByWeight() throws Exception {
+ Cache cache = EvictableCacheBuilder.newBuilder()
+ .maximumWeight(20)
+ .weigher((Integer key, String value) -> value.length())
+ .build();
+
+ for (int i = 0; i < 10; i++) {
+ String value = String.join("", Collections.nCopies(i, "a"));
+ Assert.assertEquals(value, cache.get(i, () -> value));
+ }
+
+ cache.cleanUp();
+ // It's not deterministic which entries get evicted
+ int cacheSize = Math.toIntExact(cache.size());
+
+ Assert.assertEquals(cacheSize, ((EvictableCache, ?>) cache).tokensCount());
+ Assert.assertEquals(cacheSize, cache.asMap().keySet().size());
+
+ int keySum = cache.asMap().keySet().stream()
+ .mapToInt(i -> i)
+ .sum();
+ Assert.assertTrue("key sum should be <= 20", keySum <= 20);
+
+ Assert.assertEquals(cacheSize, cache.asMap().values().size());
+
+ int valuesLengthSum = cache.asMap().values().stream()
+ .mapToInt(String::length)
+ .sum();
+ Assert.assertTrue("values length sum should be <= 20", valuesLengthSum <= 20);
+
+ // Ensure cache is effective, i.e. some entries preserved
+ int lastKey = 9; // 10 - 1
+ String expected = String.join("", Collections.nCopies(lastKey, "a")); // java8 替代 repeat
+ Assert.assertEquals(expected, cache.get(lastKey, () -> {
+ throw new UnsupportedOperationException();
+ }));
+ }
+
+ @Test
+ @Timeout(TEST_TIMEOUT_SECONDS)
+ public void testEvictByTime() throws Exception {
+ TestingTicker ticker = new TestingTicker();
+ int ttl = 100;
+ Cache cache = EvictableCacheBuilder.newBuilder()
+ .ticker(ticker)
+ .expireAfterWrite(ttl, TimeUnit.MILLISECONDS)
+ .build();
+
+ Assert.assertEquals("1 ala ma kota", cache.get(1, () -> "1 ala ma kota"));
+ ticker.increment(ttl, TimeUnit.MILLISECONDS);
+ Assert.assertEquals("2 ala ma kota", cache.get(2, () -> "2 ala ma kota"));
+ cache.cleanUp();
+
+ // First entry should be expired and its token removed
+ int cacheSize = Math.toIntExact(cache.size());
+ Assert.assertEquals(1, cacheSize);
+ Assert.assertEquals(cacheSize, ((EvictableCache, ?>) cache).tokensCount());
+ Assert.assertEquals(cacheSize, cache.asMap().keySet().size());
+ Assert.assertEquals(cacheSize, cache.asMap().values().size());
+ }
+
+ @Test
+ @Timeout(TEST_TIMEOUT_SECONDS)
+ public void testPreserveValueLoadedAfterTimeExpiration() throws Exception {
+ TestingTicker ticker = new TestingTicker();
+ int ttl = 100;
+ Cache cache = EvictableCacheBuilder.newBuilder()
+ .ticker(ticker)
+ .expireAfterWrite(ttl, TimeUnit.MILLISECONDS)
+ .build();
+ int key = 11;
+
+ Assert.assertEquals("11 ala ma kota", cache.get(key, () -> "11 ala ma kota"));
+ Assert.assertEquals(1, ((EvictableCache, ?>) cache).tokensCount());
+
+ Assert.assertEquals("11 ala ma kota", cache.get(key, () -> "something else"));
+ Assert.assertEquals(1, ((EvictableCache, ?>) cache).tokensCount());
+
+ ticker.increment(ttl, TimeUnit.MILLISECONDS);
+ Assert.assertEquals("new value", cache.get(key, () -> "new value"));
+ Assert.assertEquals(1, ((EvictableCache, ?>) cache).tokensCount());
+
+ Assert.assertEquals("new value", cache.get(key, () -> "something yet different"));
+ Assert.assertEquals(1, ((EvictableCache, ?>) cache).tokensCount());
+
+ Assert.assertEquals(1, cache.size());
+ Assert.assertEquals(1, ((EvictableCache, ?>) cache).tokensCount());
+ Assert.assertEquals(1, cache.asMap().keySet().size());
+ Assert.assertEquals(1, cache.asMap().values().size());
+ }
+
+ @Test
+ @Timeout(TEST_TIMEOUT_SECONDS)
+ public void testReplace() throws Exception {
+ Cache cache = EvictableCacheBuilder.newBuilder()
+ .maximumSize(10)
+ .build();
+
+ int key = 10;
+ int initialValue = 20;
+ int replacedValue = 21;
+
+ cache.get(key, () -> initialValue);
+
+ Assert.assertTrue("Should successfully replace value", cache.asMap().replace(key, initialValue, replacedValue));
+ Assert.assertEquals("Cache should contain replaced value", replacedValue, cache.getIfPresent(key).intValue());
+
+ Assert.assertFalse("Should not replace when current value is different", cache.asMap().replace(key, initialValue, replacedValue));
+ Assert.assertEquals("Cache should maintain replaced value", replacedValue, cache.getIfPresent(key).intValue());
+
+ Assert.assertFalse("Should not replace non-existent key", cache.asMap().replace(100000, replacedValue, 22));
+ Assert.assertEquals("Cache should only contain original key", ImmutableSet.of(key), cache.asMap().keySet());
+ Assert.assertEquals("Original key should maintain its value", replacedValue, cache.getIfPresent(key).intValue());
+
+ int anotherKey = 13;
+ int anotherInitialValue = 14;
+ cache.get(anotherKey, () -> anotherInitialValue);
+ cache.invalidate(anotherKey);
+
+ Assert.assertFalse("Should not replace after invalidation", cache.asMap().replace(anotherKey, anotherInitialValue, 15));
+ Assert.assertEquals("Cache should only contain original key after invalidation", ImmutableSet.of(key), cache.asMap().keySet());
+ }
+
+ @Test
+ @Timeout(TEST_TIMEOUT_SECONDS)
+ public void testDisabledCache() throws Exception {
+ Exception exception = Assert.assertThrows(IllegalStateException.class, () ->
+ EvictableCacheBuilder.newBuilder()
+ .maximumSize(0)
+ .build());
+
+ Assert.assertEquals("Even when cache is disabled, the loads are synchronized and both load results and failures are shared between threads. "
+ + "This is rarely desired, thus builder caller is expected to either opt-in into this behavior with shareResultsAndFailuresEvenIfDisabled(), "
+ + "or choose not to share results (and failures) between concurrent invocations with shareNothingWhenDisabled().",
+ exception.getMessage());
+
+ testDisabledCache(
+ EvictableCacheBuilder.newBuilder()
+ .maximumSize(0)
+ .shareNothingWhenDisabled()
+ .build());
+
+ testDisabledCache(
+ EvictableCacheBuilder.newBuilder()
+ .maximumSize(0)
+ .shareResultsAndFailuresEvenIfDisabled()
+ .build());
+ }
+
+ private void testDisabledCache(Cache cache) throws Exception {
+ for (int i = 0; i < 10; i++) {
+ int value = i * 10;
+ Assert.assertEquals(value, cache.get(i, () -> value).intValue());
+ }
+
+ cache.cleanUp();
+ Assert.assertEquals(0, cache.size());
+ Assert.assertTrue(cache.asMap().keySet().isEmpty());
+ Assert.assertTrue(cache.asMap().values().isEmpty());
+ }
+
+ private static class CacheStatsAssertions {
+ public static CacheStatsAssertions assertCacheStats(Cache, ?> cache) {
+ Objects.requireNonNull(cache, "cache is null");
+ return assertCacheStats(cache::stats);
+ }
+
+ public static CacheStatsAssertions assertCacheStats(Supplier statsSupplier) {
+ return new CacheStatsAssertions(statsSupplier);
+ }
+
+ private final Supplier stats;
+
+ private long loads;
+ private long hits;
+ private long misses;
+
+ private CacheStatsAssertions(Supplier stats) {
+ this.stats = Objects.requireNonNull(stats, "stats is null");
+ }
+
+ public CacheStatsAssertions loads(long value) {
+ this.loads = value;
+ return this;
+ }
+
+ public CacheStatsAssertions hits(long value) {
+ this.hits = value;
+ return this;
+ }
+
+ public CacheStatsAssertions misses(long value) {
+ this.misses = value;
+ return this;
+ }
+
+ public void afterRunning(Runnable runnable) {
+ try {
+ calling(() -> {
+ runnable.run();
+ return null;
+ });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public T calling(Callable callable)
+ throws Exception {
+ CacheStats beforeStats = stats.get();
+ T value = callable.call();
+ CacheStats afterStats = stats.get();
+
+ long loadDelta = afterStats.loadCount() - beforeStats.loadCount();
+ long missesDelta = afterStats.missCount() - beforeStats.missCount();
+ long hitsDelta = afterStats.hitCount() - beforeStats.hitCount();
+
+ Assert.assertEquals(loads, loadDelta);
+ Assert.assertEquals(hits, hitsDelta);
+ Assert.assertEquals(misses, missesDelta);
+
+ return value;
+ }
+ }
+
+ @Test
+ @Timeout(TEST_TIMEOUT_SECONDS)
+ public void testLoadStats()
+ throws Exception {
+ Cache cache = EvictableCacheBuilder.newBuilder()
+ .maximumSize(10_000)
+ .recordStats()
+ .build();
+
+ Assert.assertEquals(new CacheStats(0, 0, 0, 0, 0, 0), cache.stats());
+
+ String value = CacheStatsAssertions.assertCacheStats(cache)
+ .misses(1)
+ .loads(1)
+ .calling(() -> cache.get(42, () -> "abc"));
+ Assert.assertEquals("abc", value);
+
+ value = CacheStatsAssertions.assertCacheStats(cache)
+ .hits(1)
+ .calling(() -> cache.get(42, () -> "xyz"));
+ Assert.assertEquals("abc", value);
+
+ // with equal, but not the same key
+ value = CacheStatsAssertions.assertCacheStats(cache)
+ .hits(1)
+ .calling(() -> cache.get(newInteger(42), () -> "xyz"));
+ Assert.assertEquals("abc", value);
+ }
+
+ @Test
+ @Timeout(TEST_TIMEOUT_SECONDS)
+ public void testLoadFailure()
+ throws Exception {
+ Cache cache = EvictableCacheBuilder.newBuilder()
+ .maximumSize(0)
+ .expireAfterWrite(0, TimeUnit.DAYS)
+ .shareResultsAndFailuresEvenIfDisabled()
+ .build();
+ int key = 10;
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ try {
+ Exchanger exchanger = new Exchanger<>();
+ CountDownLatch secondUnblocked = new CountDownLatch(1);
+
+ List> futures = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ boolean first = i == 0;
+ futures.add(executor.submit(() -> {
+ if (!first) {
+ // Wait for the first one to start the call
+ exchanger.exchange(Thread.currentThread(), 10, TimeUnit.SECONDS);
+ // Prove that we are back in RUNNABLE state.
+ secondUnblocked.countDown();
+ }
+ return cache.get(key, () -> {
+ if (first) {
+ Thread secondThread = exchanger.exchange(null, 10, TimeUnit.SECONDS);
+ Assert.assertTrue(secondUnblocked.await(10, TimeUnit.SECONDS));
+ // Wait for the second one to hang inside the cache.get call.
+ long start = System.nanoTime();
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ Assert.assertNotEquals(Thread.State.RUNNABLE, secondThread.getState());
+ break;
+ } catch (Exception | AssertionError e) {
+ if (System.nanoTime() - start > TimeUnit.SECONDS.toNanos(30)) {
+ throw e;
+ }
+ }
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ throw new RuntimeException("first attempt is poised to fail");
+ }
+ return "success";
+ });
+ }));
+ }
+
+ List results = new ArrayList<>();
+ for (Future future : futures) {
+ try {
+ results.add(future.get());
+ } catch (ExecutionException e) {
+ results.add(e.getCause().toString());
+ }
+ }
+
+ // Note: if this starts to fail, that suggests that Guava implementation changed and NoopCache may be redundant now.
+ String expectedError = "com.google.common.util.concurrent.UncheckedExecutionException: "
+ + "java.lang.RuntimeException: first attempt is poised to fail";
+ Assert.assertEquals(2, results.size());
+ Assert.assertEquals(expectedError, results.get(0));
+ Assert.assertEquals(expectedError, results.get(1));
+ } finally {
+ executor.shutdownNow();
+ Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
+ }
+ }
+
+ @SuppressModernizer
+ private static Integer newInteger(int value) {
+ Integer integer = value;
+ @SuppressWarnings({"UnnecessaryBoxing", "BoxedPrimitiveConstructor", "CachedNumberConstructorCall", "removal"})
+ Integer newInteger = new Integer(value);
+ Assert.assertNotSame(integer, newInteger);
+ return newInteger;
+ }
+
+ /**
+ * Test that the loader is invoked only once for concurrent invocations of {{@link LoadingCache#get(Object, Callable)} with equal keys.
+ * This is a behavior of Guava Cache as well. While this is necessarily desirable behavior (see
+ * https://github.com/trinodb/trino/issues/11067 ),
+ * the test exists primarily to document current state and support discussion, should the current state change.
+ */
+ @Test
+ @Timeout(TEST_TIMEOUT_SECONDS)
+ public void testConcurrentGetWithCallableShareLoad()
+ throws Exception {
+ AtomicInteger loads = new AtomicInteger();
+ AtomicInteger concurrentInvocations = new AtomicInteger();
+
+ Cache cache = EvictableCacheBuilder.newBuilder()
+ .maximumSize(10_000)
+ .build();
+
+ int threads = 2;
+ int invocationsPerThread = 100;
+ ExecutorService executor = Executors.newFixedThreadPool(threads);
+ try {
+ CyclicBarrier barrier = new CyclicBarrier(threads);
+ List> futures = new ArrayList<>();
+ for (int i = 0; i < threads; i++) {
+ futures.add(executor.submit(() -> {
+ for (int invocation = 0; invocation < invocationsPerThread; invocation++) {
+ int key = invocation;
+ barrier.await(10, TimeUnit.SECONDS);
+ int value = cache.get(key, () -> {
+ loads.incrementAndGet();
+ int invocations = concurrentInvocations.incrementAndGet();
+ Preconditions.checkState(invocations == 1, "There should be no concurrent invocations, cache should do load sharing when get() invoked for same key");
+ Thread.sleep(1);
+ concurrentInvocations.decrementAndGet();
+ return -key;
+ });
+ Assert.assertEquals(-invocation, value);
+ }
+ return null;
+ }));
+ }
+
+ for (Future> future : futures) {
+ future.get(10, TimeUnit.SECONDS);
+ }
+ Assert.assertTrue(
+ String.format(
+ "loads (%d) should be between %d and %d",
+ loads.intValue(),
+ invocationsPerThread,
+ threads * invocationsPerThread - 1),
+ loads.intValue() >= invocationsPerThread && loads.intValue() <= threads * invocationsPerThread - 1);
+ } finally {
+ executor.shutdownNow();
+ Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
+ }
+ }
+
+ enum Invalidation {
+ INVALIDATE_KEY,
+ INVALIDATE_PREDEFINED_KEYS,
+ INVALIDATE_SELECTED_KEYS,
+ INVALIDATE_ALL,
+ /**/;
+ }
+
+ /**
+ * Covers https://github.com/google/guava/issues/1881
+ */
+ @Test
+ @Timeout(TEST_TIMEOUT_SECONDS)
+ public void testInvalidateOngoingLoad()
+ throws Exception {
+ for (Invalidation invalidation : Invalidation.values()) {
+ Cache cache = EvictableCacheBuilder.newBuilder()
+ .maximumSize(10_000)
+ .build();
+ Integer key = 42;
+
+ CountDownLatch loadOngoing = new CountDownLatch(1);
+ CountDownLatch invalidated = new CountDownLatch(1);
+ CountDownLatch getReturned = new CountDownLatch(1);
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ try {
+ // thread A
+ Future threadA = executor.submit(() -> {
+ String value = cache.get(key, () -> {
+ loadOngoing.countDown(); // 1
+ Assert.assertTrue(invalidated.await(10, TimeUnit.SECONDS)); // 2
+ return "stale value";
+ });
+ getReturned.countDown(); // 3
+ return value;
+ });
+
+ // thread B
+ Future threadB = executor.submit(() -> {
+ Assert.assertTrue(loadOngoing.await(10, TimeUnit.SECONDS)); // 1
+
+ switch (invalidation) {
+ case INVALIDATE_KEY:
+ cache.invalidate(key);
+ break;
+ case INVALIDATE_PREDEFINED_KEYS:
+ cache.invalidateAll(ImmutableList.of(key));
+ break;
+ case INVALIDATE_SELECTED_KEYS:
+ Set keys = cache.asMap().keySet().stream()
+ .filter(foundKey -> (int) foundKey == key)
+ .collect(ImmutableSet.toImmutableSet());
+ cache.invalidateAll(keys);
+ break;
+ case INVALIDATE_ALL:
+ cache.invalidateAll();
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+
+ invalidated.countDown(); // 2
+ // Cache may persist value after loader returned, but before `cache.get(...)` returned. Ensure the latter completed.
+ Assert.assertTrue(getReturned.await(10, TimeUnit.SECONDS)); // 3
+
+ return cache.get(key, () -> "fresh value");
+ });
+
+ Assert.assertEquals("stale value", threadA.get());
+ Assert.assertEquals("fresh value", threadB.get());
+ } finally {
+ executor.shutdownNow();
+ Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
+ }
+ }
+ }
+
+ /**
+ * Covers https://github.com/google/guava/issues/1881
+ */
+ @Test
+ @Timeout(TEST_TIMEOUT_SECONDS)
+ public void testInvalidateAndLoadConcurrently()
+ throws Exception {
+ for (Invalidation invalidation : Invalidation.values()) {
+ int[] primes = {2, 3, 5, 7};
+ AtomicLong remoteState = new AtomicLong(1);
+
+ Cache cache = EvictableCacheBuilder.newBuilder()
+ .maximumSize(10_000)
+ .build();
+ Integer key = 42;
+ int threads = 4;
+
+ CyclicBarrier barrier = new CyclicBarrier(threads);
+ ExecutorService executor = Executors.newFixedThreadPool(threads);
+ try {
+ List> futures = IntStream.range(0, threads)
+ .mapToObj(threadNumber -> executor.submit(() -> {
+ // prime the cache
+ Assert.assertEquals(1L, (long) cache.get(key, remoteState::get));
+ int prime = primes[threadNumber];
+
+ barrier.await(10, TimeUnit.SECONDS);
+
+ // modify underlying state
+ remoteState.updateAndGet(current -> current * prime);
+
+ // invalidate
+ switch (invalidation) {
+ case INVALIDATE_KEY:
+ cache.invalidate(key);
+ break;
+ case INVALIDATE_PREDEFINED_KEYS:
+ cache.invalidateAll(ImmutableList.of(key));
+ break;
+ case INVALIDATE_SELECTED_KEYS:
+ Set keys = cache.asMap().keySet().stream()
+ .filter(foundKey -> (int) foundKey == key)
+ .collect(ImmutableSet.toImmutableSet());
+ cache.invalidateAll(keys);
+ break;
+ case INVALIDATE_ALL:
+ cache.invalidateAll();
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+
+ // read through cache
+ long current = cache.get(key, remoteState::get);
+ if (current % prime != 0) {
+ throw new AssertionError(String.format("The value read through cache (%s) in thread (%s) is not divisible by (%s)", current, threadNumber, prime));
+ }
+
+ return (Void) null;
+ }))
+ .collect(ImmutableList.toImmutableList());
+
+ for (Future> future : futures) {
+ try {
+ future.get(10, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new RuntimeException("Failed to get future value", e);
+ }
+ }
+
+ Assert.assertEquals(2 * 3 * 5 * 7, remoteState.get());
+ Assert.assertEquals(remoteState.get(), (long) cache.get(key, remoteState::get));
+ } finally {
+ executor.shutdownNow();
+ Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
+ }
+ }
+ }
+
+ @Test
+ public void testPutOnEmptyCacheImplementation() {
+ for (DisabledCacheImplementation disabledCacheImplementation : DisabledCacheImplementation.values()) {
+ Cache cache = EvictableCacheBuilder.newBuilder()
+ .maximumSize(0)
+ .disabledCacheImplementation(disabledCacheImplementation)
+ .build();
+ Map cacheMap = cache.asMap();
+
+ int key = 0;
+ int value = 1;
+ Assert.assertNull(cacheMap.put(key, value));
+ Assert.assertNull(cacheMap.put(key, value));
+ Assert.assertNull(cacheMap.putIfAbsent(key, value));
+ Assert.assertNull(cacheMap.putIfAbsent(key, value));
+ }
+ }
+
+ @Test
+ public void testPutOnNonEmptyCacheImplementation() {
+ Cache cache = EvictableCacheBuilder.newBuilder()
+ .maximumSize(10)
+ .build();
+ Map cacheMap = cache.asMap();
+
+ int key = 0;
+ int value = 1;
+
+ Exception putException = Assert.assertThrows("put operation should throw UnsupportedOperationException",
+ UnsupportedOperationException.class,
+ () -> cacheMap.put(key, value));
+ Assert.assertEquals(
+ "The operation is not supported, as in inherently races with cache invalidation. Use get(key, callable) instead.",
+ putException.getMessage());
+
+ Exception putIfAbsentException = Assert.assertThrows("putIfAbsent operation should throw UnsupportedOperationException",
+ UnsupportedOperationException.class,
+ () -> cacheMap.putIfAbsent(key, value));
+ Assert.assertEquals(
+ "The operation is not supported, as in inherently races with cache invalidation",
+ putIfAbsentException.getMessage());
+ }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java b/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java
new file mode 100644
index 000000000000000..32f47b939011ab2
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java
+// and modified by Doris
+
+package org.apache.doris.fs;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.fs.remote.RemoteFile;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import mockit.Mocked;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+// some tests may invalidate the whole cache affecting therefore other concurrent tests
+@Execution(ExecutionMode.SAME_THREAD)
+public class TransactionScopeCachingDirectoryListerTest {
+ @Test
+ public void testConcurrentDirectoryListing(@Mocked TableIf table)
+ throws IOException {
+ RemoteFile firstFile = new RemoteFile("file:/x/x", true, 1, 1);
+ RemoteFile secondFile = new RemoteFile("file:/x/y", true, 1, 1);
+ RemoteFile thirdFile = new RemoteFile("file:/y/z", true, 1, 1);
+
+ String path1 = "file:/x";
+ String path2 = "file:/y";
+
+ CountingDirectoryLister countingLister = new CountingDirectoryLister(
+ ImmutableMap.of(
+ path1, ImmutableList.of(firstFile, secondFile),
+ path2, ImmutableList.of(thirdFile)));
+
+ TransactionScopeCachingDirectoryLister cachingLister = (TransactionScopeCachingDirectoryLister)
+ new TransactionScopeCachingDirectoryListerFactory(2).get(countingLister);
+
+ assertFiles(cachingLister.listFiles(null, true, table, path2), ImmutableList.of(thirdFile));
+
+ Assert.assertEquals(1, countingLister.getListCount());
+
+ // listing path2 again shouldn't increase listing count
+ Assert.assertTrue(cachingLister.isCached(path2));
+ assertFiles(cachingLister.listFiles(null, true, table, path2), ImmutableList.of(thirdFile));
+ Assert.assertEquals(1, countingLister.getListCount());
+
+
+ // start listing path1 concurrently
+ RemoteIterator path1FilesA = cachingLister.listFiles(null, true, table, path1);
+ RemoteIterator path1FilesB = cachingLister.listFiles(null, true, table, path1);
+ Assert.assertEquals(2, countingLister.getListCount());
+
+ // list path1 files using both iterators concurrently
+ Assert.assertEquals(firstFile, path1FilesA.next());
+ Assert.assertEquals(firstFile, path1FilesB.next());
+ Assert.assertEquals(secondFile, path1FilesB.next());
+ Assert.assertEquals(secondFile, path1FilesA.next());
+ Assert.assertFalse(path1FilesA.hasNext());
+ Assert.assertFalse(path1FilesB.hasNext());
+ Assert.assertEquals(2, countingLister.getListCount());
+
+ Assert.assertFalse(cachingLister.isCached(path2));
+ assertFiles(cachingLister.listFiles(null, true, table, path2), ImmutableList.of(thirdFile));
+ Assert.assertEquals(3, countingLister.getListCount());
+ }
+
+ @Test
+ public void testConcurrentDirectoryListingException(@Mocked TableIf table)
+ throws IOException {
+ RemoteFile file = new RemoteFile("file:/x/x", true, 1, 1);
+
+ String path = "file:/x";
+
+ CountingDirectoryLister countingLister = new CountingDirectoryLister(ImmutableMap.of(path, ImmutableList.of(file)));
+ DirectoryLister cachingLister = new TransactionScopeCachingDirectoryListerFactory(1).get(countingLister);
+
+ // start listing path concurrently
+ countingLister.setThrowException(true);
+ RemoteIterator filesA = cachingLister.listFiles(null, true, table, path);
+ RemoteIterator filesB = cachingLister.listFiles(null, true, table, path);
+ Assert.assertEquals(1, countingLister.getListCount());
+
+ // listing should throw an exception
+ Assert.assertThrows(IOException.class, () -> filesA.hasNext());
+
+
+ // listing again should succeed
+ countingLister.setThrowException(false);
+ assertFiles(cachingLister.listFiles(null, true, table, path), ImmutableList.of(file));
+ Assert.assertEquals(2, countingLister.getListCount());
+
+ // listing using second concurrently initialized DirectoryLister should fail
+ Assert.assertThrows(IOException.class, () -> filesB.hasNext());
+
+ }
+
+ private void assertFiles(RemoteIterator iterator, List expectedFiles)
+ throws IOException {
+ ImmutableList.Builder actualFiles = ImmutableList.builder();
+ while (iterator.hasNext()) {
+ actualFiles.add(iterator.next());
+ }
+ Assert.assertEquals(expectedFiles, actualFiles.build());
+ }
+
+ private static class CountingDirectoryLister
+ implements DirectoryLister {
+ private final Map> fileStatuses;
+ private int listCount;
+ private boolean throwException;
+
+ public CountingDirectoryLister(Map> fileStatuses) {
+ this.fileStatuses = Objects.requireNonNull(fileStatuses, "fileStatuses is null");
+ }
+
+ @Override
+ public RemoteIterator listFiles(FileSystem fs, boolean recursive, TableIf table, String location)
+ throws IOException {
+ // No specific recursive files-only listing implementation
+ listCount++;
+ return throwingRemoteIterator(Objects.requireNonNull(fileStatuses.get(location)), throwException);
+ }
+
+ public void setThrowException(boolean throwException) {
+ this.throwException = throwException;
+ }
+
+ public int getListCount() {
+ return listCount;
+ }
+ }
+
+ static RemoteIterator throwingRemoteIterator(List files, boolean throwException) {
+ return new RemoteIterator() {
+ private final Iterator iterator = ImmutableList.copyOf(files).iterator();
+
+ @Override
+ public boolean hasNext()
+ throws IOException {
+ if (throwException) {
+ throw new IOException();
+ }
+ return iterator.hasNext();
+ }
+
+ @Override
+ public RemoteFile next() {
+ return iterator.next();
+ }
+ };
+ }
+}