forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
1 changed file
with
70 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -170,6 +170,7 @@ public final class IndicesRequestCache implements TieredCacheEventListener<Indic | |
.setTieredCacheEventListener(this) | ||
.withPolicy(new DiskTierTookTimePolicy(settings, clusterSettings, transformationFunction)) | ||
.build(); | ||
|
||
this.indicesService = indicesService; | ||
} | ||
|
||
|
@@ -223,7 +224,7 @@ BytesReference getOrCompute( | |
Loader cacheLoader = new Loader(cacheEntity, loader); | ||
BytesReference value = tieredCacheService.computeIfAbsent(key, cacheLoader); | ||
if (cacheLoader.isLoaded()) { | ||
// see if its the first time we see this reader, and make sure to register a cleanup key | ||
// see if it's the first time we see this reader, and make sure to register a cleanup key | ||
CleanupKey cleanupKey = new CleanupKey(cacheEntity, readerCacheKeyId); | ||
if (!registeredClosedListeners.containsKey(cleanupKey)) { | ||
Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE); | ||
|
@@ -425,36 +426,90 @@ public int hashCode() { | |
/** | ||
* Logic to clean up in-memory cache. | ||
*/ | ||
synchronized void cleanCache() { | ||
synchronized void cleanCache() { // TODO rename this method to plural or cleanTieredCache ? | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong. |
||
final Set<CleanupKey> currentKeysToClean = new HashSet<>(); | ||
final Set<Object> currentFullClean = new HashSet<>(); | ||
currentKeysToClean.clear(); | ||
currentFullClean.clear(); | ||
for (Iterator<CleanupKey> iterator = keysToClean.iterator(); iterator.hasNext();) { | ||
|
||
Iterator<CleanupKey> iterator = keysToClean.iterator(); | ||
while(iterator.hasNext()) { | ||
CleanupKey cleanupKey = iterator.next(); | ||
iterator.remove(); | ||
if (cleanupKey.readerCacheKeyId == null || cleanupKey.entity.isOpen() == false) { | ||
// null indicates full cleanup, as does a closed shard | ||
if (needsFullClean(cleanupKey)) { | ||
currentFullClean.add(cleanupKey.entity.getCacheIdentity()); | ||
} else { | ||
currentKeysToClean.add(cleanupKey); | ||
} | ||
} | ||
if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) { | ||
for (Iterator<Key> iterator = tieredCacheService.getOnHeapCachingTier().keys().iterator(); iterator.hasNext();) { | ||
Key key = iterator.next(); | ||
if (currentFullClean.contains(key.entity.getCacheIdentity())) { | ||
|
||
// Early exit if no cleanup is needed | ||
if (currentKeysToClean.isEmpty() && currentFullClean.isEmpty()) { | ||
return; | ||
} | ||
cleanTieredCaches(currentKeysToClean, currentFullClean); | ||
} | ||
|
||
private void cleanTieredCaches(Set<CleanupKey> currentKeysToClean, Set<Object> currentFullClean) { | ||
cleanOnHeapCache(currentKeysToClean, currentFullClean); | ||
cleanDiskCache(currentKeysToClean, currentFullClean); | ||
} | ||
|
||
// keeping this unsynchronized since we don't expect it to be called only by cleanCache which is synchronized | ||
private void cleanDiskCache(Set<CleanupKey> currentKeysToClean, Set<Object> currentFullClean) { | ||
if (tieredCacheService.getDiskCachingTier().isEmpty()) { | ||
This comment has been minimized.
Sorry, something went wrong.
kiranprakash154
Author
Collaborator
|
||
logger.debug("Skipping disk cache keys cleanup since no disk cache is configured"); | ||
return; | ||
} | ||
final double cleanupKeysThresholdPercentage = 50.0; // TODO make this an index setting | ||
int totalKeysInDiskCache = tieredCacheService.getDiskCachingTier().get().count(); | ||
int totalKeysToCleanup = currentKeysToClean.size() + currentFullClean.size(); | ||
|
||
double cleanupKeysPercentage = ((double) totalKeysToCleanup / totalKeysInDiskCache) * 100; | ||
if (cleanupKeysPercentage < cleanupKeysThresholdPercentage) { | ||
logger.debug("Skipping disk cache keys cleanup since the keys to cleanup of {}% is not greater than " + | ||
"the threshold percentage of {}%", cleanupKeysPercentage, cleanupKeysThresholdPercentage); | ||
return; | ||
} | ||
|
||
Iterator<Key> iterator = tieredCacheService.getDiskCachingTier().get().keys().iterator(); | ||
while (iterator.hasNext()) { | ||
Key key = iterator.next(); | ||
if (currentFullClean.contains(key.entity.getCacheIdentity())) { | ||
iterator.remove(); | ||
currentFullClean.remove(key.entity.getCacheIdentity()); | ||
} else { | ||
CleanupKey cleanupKey = new CleanupKey(key.entity, key.readerCacheKeyId); | ||
if (currentKeysToClean.contains(cleanupKey)) { | ||
iterator.remove(); | ||
currentKeysToClean.remove(cleanupKey); | ||
} | ||
} | ||
} | ||
} | ||
|
||
// keeping this unsynchronized since we don't expect it to be called only by cleanCache which is synchronized | ||
private void cleanOnHeapCache(Set<CleanupKey> currentKeysToClean, Set<Object> currentFullClean) { | ||
Iterator<Key> iterator = tieredCacheService.getOnHeapCachingTier().keys().iterator(); | ||
while (iterator.hasNext()) { | ||
Key key = iterator.next(); | ||
if (currentFullClean.contains(key.entity.getCacheIdentity())) { | ||
iterator.remove(); | ||
currentFullClean.remove(key.entity.getCacheIdentity()); | ||
} else { | ||
CleanupKey cleanupKey = new CleanupKey(key.entity, key.readerCacheKeyId); | ||
if (currentKeysToClean.contains(cleanupKey)) { | ||
iterator.remove(); | ||
} else { | ||
if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKeyId))) { | ||
iterator.remove(); | ||
} | ||
currentKeysToClean.remove(cleanupKey); | ||
} | ||
} | ||
} | ||
tieredCacheService.getOnHeapCachingTier().refresh(); | ||
} | ||
|
||
private boolean needsFullClean(CleanupKey cleanupKey) { | ||
// null indicates full cleanup, as does a closed shard | ||
return cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen(); | ||
} | ||
|
||
/** | ||
* Returns the current size of the cache | ||
*/ | ||
|
Moving Sagar's comment from my fork
Sagar's comment -
Lets separate the logic and frequency of on-heap and disk cache. So that we can control the frequency of disk cache clean up separately. We can start with 1min but have a cluster setting available to tweak it if needed. This would be a static setting same as onHeap setting.