Skip to content

Commit

Permalink
More updates to disk cache cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
kiranprakash154 committed Nov 28, 2023
1 parent 52b9dfd commit 5bab65d
Show file tree
Hide file tree
Showing 3 changed files with 360 additions and 84 deletions.
138 changes: 84 additions & 54 deletions server/src/main/java/org/opensearch/indices/IndicesRequestCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.tier.BytesReferenceSerializer;
import org.opensearch.common.cache.tier.CachePolicyInfoWrapper;
import org.opensearch.common.cache.tier.DiskTierTookTimePolicy;
import org.opensearch.common.cache.tier.CacheValue;
import org.opensearch.common.cache.tier.CachingTier;
import org.opensearch.common.cache.tier.DiskTierTookTimePolicy;
import org.opensearch.common.cache.tier.EhCacheDiskCachingTier;
import org.opensearch.common.cache.tier.OnHeapCachingTier;
import org.opensearch.common.cache.tier.OpenSearchOnHeapCache;
Expand All @@ -64,7 +65,6 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.search.query.QuerySearchResult;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -174,6 +174,19 @@ public final class IndicesRequestCache implements TieredCacheEventListener<Indic
this.indicesService = indicesService;
}

// added for testing
IndicesRequestCache(
Settings settings,
IndicesService indicesService,
TieredCacheService<Key, BytesReference> tieredCacheService
) {
this.size = INDICES_CACHE_QUERY_SIZE.get(settings);
this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null;
long sizeInBytes = size.getBytes();
this.indicesService = indicesService;
this.tieredCacheService = tieredCacheService;
}

@Override
public void close() {
tieredCacheService.invalidateAll();
Expand Down Expand Up @@ -238,9 +251,10 @@ BytesReference getOrCompute(

/**
* Invalidates the given the cache entry for the given key and it's context
*
* @param cacheEntity the cache entity to invalidate for
* @param reader the reader to invalidate the cache entry for
* @param cacheKey the cache key to invalidate
* @param reader the reader to invalidate the cache entry for
* @param cacheKey the cache key to invalidate
*/
void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) {
assert reader.getReaderCacheHelper() != null;
Expand Down Expand Up @@ -426,83 +440,87 @@ public int hashCode() {
/**
* Logic to clean up in-memory cache.
*/
synchronized void cleanCache() { // TODO rename this method to plural or cleanTieredCache ?
synchronized void cleanCache() {
final Set<CleanupKey> currentKeysToClean = new HashSet<>();
final Set<Object> currentFullClean = new HashSet<>();

Iterator<CleanupKey> iterator = keysToClean.iterator();
while(iterator.hasNext()) {
CleanupKey cleanupKey = iterator.next();
iterator.remove();
if (needsFullClean(cleanupKey)) {
currentFullClean.add(cleanupKey.entity.getCacheIdentity());
} else {
currentKeysToClean.add(cleanupKey);
}
}
categorizeKeysForCleanup(currentKeysToClean, currentFullClean);

// Early exit if no cleanup is needed
if (currentKeysToClean.isEmpty() && currentFullClean.isEmpty()) {
return;
}
cleanTieredCaches(currentKeysToClean, currentFullClean);

cleanUpKeys(
tieredCacheService.getOnHeapCachingTier().keys().iterator(),
currentKeysToClean,
currentFullClean
);
tieredCacheService.getOnHeapCachingTier().refresh();
}

private void cleanTieredCaches(Set<CleanupKey> currentKeysToClean, Set<Object> currentFullClean) {
cleanOnHeapCache(currentKeysToClean, currentFullClean);
cleanDiskCache(currentKeysToClean, currentFullClean);
/**
* Logic to clean up disk based cache.
* <p>
* TODO: cleanDiskCache is very specific to disk caching tier. We can refactor this to be more generic.
*/
synchronized void cleanDiskCache(double diskCachesCleanThresholdPercent) {
tieredCacheService.getDiskCachingTier().ifPresent(diskCachingTier -> {
if (diskCachingTier.count() == 0 || diskCleanupKeysPercentage() < diskCachesCleanThresholdPercent) {
if (logger.isDebugEnabled()) {
logger.debug("Skipping disk cache keys cleanup");
return;
}
}
Set<CleanupKey> currentKeysToClean = new HashSet<>();
Set<Object> currentFullClean = new HashSet<>();

categorizeKeysForCleanup(currentKeysToClean, currentFullClean);

// Early exit if no cleanup is needed
if (currentKeysToClean.isEmpty() && currentFullClean.isEmpty()) {
return;
}
cleanUpKeys(diskCachingTier.keys().iterator(), currentKeysToClean, currentFullClean);
});
}

// keeping this unsynchronized since we don't expect it to be called only by cleanCache which is synchronized
private void cleanDiskCache(Set<CleanupKey> currentKeysToClean, Set<Object> currentFullClean) {
if (tieredCacheService.getDiskCachingTier().isEmpty()) {
logger.debug("Skipping disk cache keys cleanup since no disk cache is configured");
return;
}
final double cleanupKeysThresholdPercentage = 50.0; // TODO make this an index setting
int totalKeysInDiskCache = tieredCacheService.getDiskCachingTier().get().count();
int totalKeysToCleanup = currentKeysToClean.size() + currentFullClean.size();

double cleanupKeysPercentage = ((double) totalKeysToCleanup / totalKeysInDiskCache) * 100;
if (cleanupKeysPercentage < cleanupKeysThresholdPercentage) {
logger.debug("Skipping disk cache keys cleanup since the keys to cleanup of {}% is not greater than " +
"the threshold percentage of {}%", cleanupKeysPercentage, cleanupKeysThresholdPercentage);
return;
synchronized double diskCleanupKeysPercentage() {
int totalKeysInDiskCache = tieredCacheService.getDiskCachingTier()
.map(CachingTier::count)
.orElse(0);
if (totalKeysInDiskCache == 0 || keysToClean.isEmpty()) {
return 0;
}
return ((double) keysToClean.size() / totalKeysInDiskCache) * 100;

This comment has been minimized.

Copy link
@kiranprakash154

kiranprakash154 Nov 28, 2023

Author Collaborator

Moving Sagar's comment from my fork

Sagar's comment -

I don't think keysToClean signifies correct stale key entries on disk cache. This will contain stale keys present on both heap/disk cache, so keysToClean can be > totalKeysInDiskCache. Plus keysToClean comprises of CleanupKeys which only contains (indexShard + luceneCacheKey) which in turn signifies all the stale keys associated with stale luceneCacheKey (or closed IndexShards), so again not accurate.

We will need to see other ways to calculate stale key entries on disk cache.

This comment has been minimized.

Copy link
@kiranprakash154

kiranprakash154 Nov 29, 2023

Author Collaborator

I have addressed this with a seperate mapping. More to come in the next commit.
Thanks for pointing this out ! great catch ! @sgup432

}

Iterator<Key> iterator = tieredCacheService.getDiskCachingTier().get().keys().iterator();
synchronized void cleanUpKeys(Iterator<Key> iterator, Set<CleanupKey> currentKeysToClean, Set<Object> currentFullClean) {
while (iterator.hasNext()) {
Key key = iterator.next();
CleanupKey cleanupKey = new CleanupKey(key.entity, key.readerCacheKeyId);
if (currentFullClean.contains(key.entity.getCacheIdentity())) {
iterator.remove();
currentFullClean.remove(key.entity.getCacheIdentity());
} else {
CleanupKey cleanupKey = new CleanupKey(key.entity, key.readerCacheKeyId);
if (currentKeysToClean.contains(cleanupKey)) {
iterator.remove();
currentKeysToClean.remove(cleanupKey);
}
keysToClean.remove(cleanupKey); // since a key could be either in onHeap or disk cache.
} else if (currentKeysToClean.contains(cleanupKey)) {
iterator.remove();
currentKeysToClean.remove(cleanupKey);
keysToClean.remove(cleanupKey);
}
}
}

// keeping this unsynchronized since we don't expect it to be called only by cleanCache which is synchronized
private void cleanOnHeapCache(Set<CleanupKey> currentKeysToClean, Set<Object> currentFullClean) {
Iterator<Key> iterator = tieredCacheService.getOnHeapCachingTier().keys().iterator();
private void categorizeKeysForCleanup(Set<CleanupKey> currentKeysToClean, Set<Object> currentFullClean) {
Iterator<CleanupKey> iterator = keysToClean.iterator();
while (iterator.hasNext()) {
Key key = iterator.next();
if (currentFullClean.contains(key.entity.getCacheIdentity())) {
iterator.remove();
currentFullClean.remove(key.entity.getCacheIdentity());
CleanupKey cleanupKey = iterator.next();
if (needsFullClean(cleanupKey)) {
currentFullClean.add(cleanupKey.entity.getCacheIdentity());
} else {
CleanupKey cleanupKey = new CleanupKey(key.entity, key.readerCacheKeyId);
if (currentKeysToClean.contains(cleanupKey)) {
iterator.remove();
currentKeysToClean.remove(cleanupKey);
}
currentKeysToClean.add(cleanupKey);
}
}
tieredCacheService.getOnHeapCachingTier().refresh();
}

private boolean needsFullClean(CleanupKey cleanupKey) {
Expand All @@ -520,4 +538,16 @@ long count() {
int numRegisteredCloseListeners() { // for testing
return registeredClosedListeners.size();
}

void addCleanupKeyForTesting(CacheEntity entity, String readerCacheKeyId) { // for testing
keysToClean.add(new CleanupKey(entity, readerCacheKeyId));
}

int getKeysToCleanSizeForTesting() { // for testing
return keysToClean.size();
}

Key createKeyForTesting(CacheEntity entity, String readerCacheKeyId) { // for testing
return new Key(entity, null, readerCacheKeyId);
}
}
Loading

0 comments on commit 5bab65d

Please sign in to comment.