From e61d461d411a67454003638a00bb071c7afe689d Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 11 Jun 2024 22:59:58 -0700 Subject: [PATCH 01/20] use concurrentmap Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCache.java | 34 +++++++--- .../indices/IndicesRequestCacheTests.java | 65 +++++++++++++++++-- 2 files changed, 83 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 5c82e5e9639f7..cb901a0735544 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -61,6 +61,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.RatioValue; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.StreamInput; @@ -75,7 +76,6 @@ import java.io.IOException; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -176,7 +176,8 @@ public final class IndicesRequestCache implements RemovalListener keysToClean; - private final ConcurrentMap> cleanupKeyToCountMap; + private final ConcurrentMap> cleanupKeyToCountMap; private final AtomicInteger staleKeysCount; private volatile double stalenessThreshold; private final IndicesRequestCacheCleaner cacheCleaner; + private final boolean pluggableCacheEnabled; - IndicesRequestCacheCleanupManager(ThreadPool threadpool, TimeValue cleanInterval, double stalenessThreshold) { + IndicesRequestCacheCleanupManager( + ThreadPool threadpool, + TimeValue cleanInterval, + double stalenessThreshold, + boolean pluggableCacheEnabled + ) { this.stalenessThreshold = stalenessThreshold; this.keysToClean = ConcurrentCollections.newConcurrentSet(); this.cleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap(); this.staleKeysCount = new AtomicInteger(0); this.cacheCleaner = new IndicesRequestCacheCleaner(this, threadpool, cleanInterval); + this.pluggableCacheEnabled = pluggableCacheEnabled; threadpool.schedule(cacheCleaner, cleanInterval, ThreadPool.Names.SAME); } @@ -556,7 +564,7 @@ void enqueueCleanupKey(CleanupKey cleanupKey) { * @param cleanupKey the CleanupKey to be updated in the map */ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) { - if (cleanupKey.entity == null) { + if (pluggableCacheEnabled || cleanupKey.entity == null) { return; } IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); @@ -568,7 +576,13 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) { // If the key doesn't exist, it's added with a value of 1. // If the key exists, its value is incremented by 1. - cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum); + addToCleanupKeyToCountMap(shardId, cleanupKey.readerCacheKeyId); + } + + // pkg-private for testing + void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) { + cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap()) + .merge(readerCacheKeyId, 1, Integer::sum); } /** @@ -587,7 +601,7 @@ private void updateStaleCountOnEntryRemoval( CleanupKey cleanupKey, RemovalNotification, BytesReference> notification ) { - if (notification.getRemovalReason() == RemovalReason.REPLACED) { + if (pluggableCacheEnabled || notification.getRemovalReason() == RemovalReason.REPLACED) { // The reason of the notification is REPLACED when a cache entry's value is updated, since replacing an entry // does not affect the staleness count, we skip such notifications. return; @@ -647,7 +661,7 @@ private void updateStaleCountOnEntryRemoval( * @param cleanupKey the CleanupKey that has been marked for cleanup */ private void incrementStaleKeysCount(CleanupKey cleanupKey) { - if (cleanupKey.entity == null) { + if (pluggableCacheEnabled || cleanupKey.entity == null) { return; } IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); @@ -789,7 +803,7 @@ private synchronized void cleanCache(double stalenessThreshold) { * @return true if the cache cleanup process can be skipped, false otherwise. */ private synchronized boolean canSkipCacheCleanup(double cleanThresholdPercent) { - if (cleanThresholdPercent == 0.0) { + if (pluggableCacheEnabled || cleanThresholdPercent == 0.0) { return false; } double staleKeysInCachePercentage = staleKeysInCachePercentage(); @@ -826,7 +840,7 @@ public void close() { } // for testing - ConcurrentMap> getCleanupKeyToCountMap() { + ConcurrentMap> getCleanupKeyToCountMap() { return cleanupKeyToCountMap; } diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 9dbdddb76ea24..b2a83c02aa70a 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -95,7 +95,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; +import java.util.ConcurrentModificationException; import java.util.List; import java.util.Map; import java.util.Optional; @@ -105,7 +105,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.emptyMap; @@ -489,7 +491,7 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount( indexShard.hashCode() ); // test the mapping - ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); // shard id should exist assertTrue(cleanupKeyToCountMap.containsKey(shardId)); // reader CacheKeyId should NOT exist @@ -552,7 +554,7 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS ); // test the mapping - ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); // shard id should exist assertTrue(cleanupKeyToCountMap.containsKey(shardId)); // reader CacheKeyId should NOT exist @@ -720,7 +722,7 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); assertEquals(1, cache.count()); // test the mappings - ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader))); cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); @@ -793,8 +795,55 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { IOUtils.close(secondReader); } - private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException { - return OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + // test adding to cleanupKeyToCountMap with multiple threads + public void testAddToCleanupKeyToCountMap() throws InterruptedException { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); + cache = getIndicesRequestCache(settings); + + int numberOfThreads = 10; + int numberOfIterations = 1000; + Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread + AtomicBoolean exceptionDetected = new AtomicBoolean(false); + + ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads); + + for (int i = 0; i < numberOfThreads; i++) { + executorService.submit(() -> { + phaser.arriveAndAwaitAdvance(); // Ensure all threads start at the same time + try { + for (int j = 0; j < numberOfIterations; j++) { + cache.cacheCleanupManager.addToCleanupKeyToCountMap(indexShard.shardId(), UUID.randomUUID().toString()); + } + } catch (ConcurrentModificationException e) { + e.printStackTrace(); + exceptionDetected.set(true); // Set flag if exception is detected + } + }); + } + phaser.arriveAndAwaitAdvance(); // Start all threads + + // Main thread iterates over the map + executorService.submit(() -> { + try { + for (int j = 0; j < numberOfIterations; j++) { + cache.cacheCleanupManager.getCleanupKeyToCountMap().forEach((k, v) -> { + v.forEach((k1, v1) -> { + // Accessing the map to create contention + v.get(k1); + }); + }); + } + } catch (ConcurrentModificationException e) { + e.printStackTrace(); + exceptionDetected.set(true); // Set flag if exception is detected + } + }); + + executorService.shutdown(); + executorService.awaitTermination(60, TimeUnit.SECONDS); + + assertFalse(exceptionDetected.get()); } private IndicesRequestCache getIndicesRequestCache(Settings settings) { @@ -808,6 +857,10 @@ private IndicesRequestCache getIndicesRequestCache(Settings settings) { ); } + private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException { + return OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + } + private Loader getLoader(DirectoryReader reader) { return new Loader(reader, 0); } From 5d08619c61ba4731e630b2b7fbbc3f7c72c29c76 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 12 Jun 2024 10:32:59 -0700 Subject: [PATCH 02/20] Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCacheTests.java | 51 ------------------- 1 file changed, 51 deletions(-) diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index b2a83c02aa70a..d3835d8be10ac 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -795,57 +795,6 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { IOUtils.close(secondReader); } - // test adding to cleanupKeyToCountMap with multiple threads - public void testAddToCleanupKeyToCountMap() throws InterruptedException { - threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); - cache = getIndicesRequestCache(settings); - - int numberOfThreads = 10; - int numberOfIterations = 1000; - Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread - AtomicBoolean exceptionDetected = new AtomicBoolean(false); - - ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads); - - for (int i = 0; i < numberOfThreads; i++) { - executorService.submit(() -> { - phaser.arriveAndAwaitAdvance(); // Ensure all threads start at the same time - try { - for (int j = 0; j < numberOfIterations; j++) { - cache.cacheCleanupManager.addToCleanupKeyToCountMap(indexShard.shardId(), UUID.randomUUID().toString()); - } - } catch (ConcurrentModificationException e) { - e.printStackTrace(); - exceptionDetected.set(true); // Set flag if exception is detected - } - }); - } - phaser.arriveAndAwaitAdvance(); // Start all threads - - // Main thread iterates over the map - executorService.submit(() -> { - try { - for (int j = 0; j < numberOfIterations; j++) { - cache.cacheCleanupManager.getCleanupKeyToCountMap().forEach((k, v) -> { - v.forEach((k1, v1) -> { - // Accessing the map to create contention - v.get(k1); - }); - }); - } - } catch (ConcurrentModificationException e) { - e.printStackTrace(); - exceptionDetected.set(true); // Set flag if exception is detected - } - }); - - executorService.shutdown(); - executorService.awaitTermination(60, TimeUnit.SECONDS); - - assertFalse(exceptionDetected.get()); - } - private IndicesRequestCache getIndicesRequestCache(Settings settings) { IndicesService indicesService = getInstanceFromNode(IndicesService.class); return new IndicesRequestCache( From 711d1254fd227b1911ee2770da3f9b9783ffa823 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 12 Jun 2024 10:33:38 -0700 Subject: [PATCH 03/20] Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCacheTests.java | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index d3835d8be10ac..205712d388cd1 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -795,6 +795,56 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { IOUtils.close(secondReader); } + // test adding to cleanupKeyToCountMap with multiple threads + public void testAddToCleanupKeyToCountMap() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); + cache = getIndicesRequestCache(settings); + + int numberOfThreads = 10; + int numberOfIterations = 1000; + Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread + AtomicBoolean exceptionDetected = new AtomicBoolean(false); + + ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads); + + for (int i = 0; i < numberOfThreads; i++) { + executorService.submit(() -> { + phaser.arriveAndAwaitAdvance(); // Ensure all threads start at the same time + try { + for (int j = 0; j < numberOfIterations; j++) { + cache.cacheCleanupManager.addToCleanupKeyToCountMap(indexShard.shardId(), UUID.randomUUID().toString()); + } + } catch (ConcurrentModificationException e) { + logger.error("ConcurrentModificationException detected in thread : " + e.getMessage()); + exceptionDetected.set(true); // Set flag if exception is detected + } + }); + } + phaser.arriveAndAwaitAdvance(); // Start all threads + + // Main thread iterates over the map + executorService.submit(() -> { + try { + for (int j = 0; j < numberOfIterations; j++) { + cache.cacheCleanupManager.getCleanupKeyToCountMap().forEach((k, v) -> { + v.forEach((k1, v1) -> { + // Accessing the map to create contention + v.get(k1); + }); + }); + } + } catch (ConcurrentModificationException e) { + logger.error("ConcurrentModificationException detected in main thread : " + e.getMessage()); + exceptionDetected.set(true); // Set flag if exception is detected + } + }); + + executorService.shutdown(); + executorService.awaitTermination(60, TimeUnit.SECONDS); + assertFalse(exceptionDetected.get()); + } + private IndicesRequestCache getIndicesRequestCache(Settings settings) { IndicesService indicesService = getInstanceFromNode(IndicesService.class); return new IndicesRequestCache( From e2409b10f9b76e4139b6e2037132111ff84ed165 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 12 Jun 2024 11:16:43 -0700 Subject: [PATCH 04/20] Update CHANGELOG.md Signed-off-by: Kiran Prakash --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0066e077b16a2..2d77a4323984b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Removed ### Fixed +- Fix Concurrent Modification Exception in Indices Request Cache([#14032](https://github.com/opensearch-project/OpenSearch/pull/14221)) ### Security From b2be0b048d9c1b5afa9422fd890f264998966d02 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 12 Jun 2024 11:21:23 -0700 Subject: [PATCH 05/20] Update IndicesRequestCache.java Signed-off-by: Kiran Prakash --- .../java/org/opensearch/indices/IndicesRequestCache.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index cb901a0735544..26c7e86e5d718 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -564,7 +564,7 @@ void enqueueCleanupKey(CleanupKey cleanupKey) { * @param cleanupKey the CleanupKey to be updated in the map */ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) { - if (pluggableCacheEnabled || cleanupKey.entity == null) { + if (!pluggableCacheEnabled || cleanupKey.entity == null) { return; } IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); @@ -601,7 +601,7 @@ private void updateStaleCountOnEntryRemoval( CleanupKey cleanupKey, RemovalNotification, BytesReference> notification ) { - if (pluggableCacheEnabled || notification.getRemovalReason() == RemovalReason.REPLACED) { + if (!pluggableCacheEnabled || notification.getRemovalReason() == RemovalReason.REPLACED) { // The reason of the notification is REPLACED when a cache entry's value is updated, since replacing an entry // does not affect the staleness count, we skip such notifications. return; @@ -661,7 +661,7 @@ private void updateStaleCountOnEntryRemoval( * @param cleanupKey the CleanupKey that has been marked for cleanup */ private void incrementStaleKeysCount(CleanupKey cleanupKey) { - if (pluggableCacheEnabled || cleanupKey.entity == null) { + if (!pluggableCacheEnabled || cleanupKey.entity == null) { return; } IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); @@ -803,7 +803,7 @@ private synchronized void cleanCache(double stalenessThreshold) { * @return true if the cache cleanup process can be skipped, false otherwise. */ private synchronized boolean canSkipCacheCleanup(double cleanThresholdPercent) { - if (pluggableCacheEnabled || cleanThresholdPercent == 0.0) { + if (!pluggableCacheEnabled || cleanThresholdPercent == 0.0) { return false; } double staleKeysInCachePercentage = staleKeysInCachePercentage(); From 966b5b7faf452a59b03531658e5cbe10dba200c7 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 12 Jun 2024 11:31:25 -0700 Subject: [PATCH 06/20] Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCacheTests.java | 50 +++++++++++++++---- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 205712d388cd1..c763b26a8a258 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -394,7 +394,10 @@ public void testCacheCleanupBasedOnZeroThreshold() throws Exception { // when staleness count is higher than stale threshold, stale keys should be cleaned up. public void testCacheCleanupBasedOnStaleThreshold_StalenessHigherThanThreshold() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.49").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.49") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -429,7 +432,10 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessHigherThanThreshold() // when staleness count equal to stale threshold, stale keys should be cleaned up. public void testCacheCleanupBasedOnStaleThreshold_StalenessEqualToThreshold() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.5").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.5") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); DirectoryReader reader = getReader(writer, indexShard.shardId()); @@ -461,7 +467,10 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessEqualToThreshold() th // when a cache entry that is Stale is evicted for any reason, we have to deduct the count from our staleness count public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); ShardId shardId = indexShard.shardId(); @@ -523,7 +532,10 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount( // when a cache entry that is NOT Stale is evicted for any reason, staleness count should NOT be deducted public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsStaleCount() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); ShardId shardId = indexShard.shardId(); @@ -584,7 +596,10 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS // when a cache entry that is NOT Stale is evicted WITHOUT its reader closing, we should NOT deduct it from staleness count public void testStaleCount_WithoutReaderClosing_DecrementsStaleCount() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -623,7 +638,10 @@ public void testStaleCount_WithoutReaderClosing_DecrementsStaleCount() throws Ex // test staleness count based on removal notifications public void testStaleCount_OnRemovalNotifications() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -677,7 +695,10 @@ public void testStaleCount_OnRemovalNotifications() throws Exception { // when staleness count less than the stale threshold, stale keys should NOT be cleaned up. public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -710,7 +731,10 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() // test the cleanupKeyToCountMap are set appropriately when both readers are closed public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -798,7 +822,10 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { // test adding to cleanupKeyToCountMap with multiple threads public void testAddToCleanupKeyToCountMap() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); int numberOfThreads = 10; @@ -1002,7 +1029,10 @@ public void testClosingIndexWipesStats() throws Exception { public void testCacheCleanupBasedOnStaleThreshold_thresholdUpdate() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); From 4173d8253e7a214a71cfb0eb26e255f031d46c98 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 12 Jun 2024 11:38:32 -0700 Subject: [PATCH 07/20] Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCacheTests.java | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index c763b26a8a258..da76325b54e3c 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -391,6 +391,37 @@ public void testCacheCleanupBasedOnZeroThreshold() throws Exception { IOUtils.close(secondReader); } + // when the feature flag is disabled, stale keys should be cleaned up every time cache cleaner is invoked. + public void testCacheCleanupWhenFeatureFlagIsDisabled() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0%") + .put(FeatureFlags.PLUGGABLE_CACHE, false) + .build(); + cache = getIndicesRequestCache(settings); + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); + + // Get 2 entries into the cache + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + assertEquals(1, cache.count()); + + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); + assertEquals(2, cache.count()); + + // Close the reader, to be enqueued for cleanup + // 1 out of 2 keys ie 50% are now stale. + reader.close(); + // cache count should not be affected + assertEquals(2, cache.count()); + // clean cache with 0% staleness threshold + cache.cacheCleanupManager.cleanCache(); + // cleanup should remove the stale-key + assertEquals(1, cache.count()); + IOUtils.close(secondReader); + } + // when staleness count is higher than stale threshold, stale keys should be cleaned up. public void testCacheCleanupBasedOnStaleThreshold_StalenessHigherThanThreshold() throws Exception { threadPool = getThreadPool(); @@ -819,6 +850,63 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { IOUtils.close(secondReader); } + // test the cleanupKeyToCountMap stays empty when the pluggable cache feature flag is disabled + public void testCleanupKeyToCountMapAreSetAppropriatelyWhenFeatureFlagIsDisabled() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") + .put(FeatureFlags.PLUGGABLE_CACHE, false) + .build(); + cache = getIndicesRequestCache(settings); + + writer.addDocument(newDoc(0, "foo")); + ShardId shardId = indexShard.shardId(); + DirectoryReader reader = getReader(writer, shardId); + DirectoryReader secondReader = getReader(writer, shardId); + + // Get 2 entries into the cache from 2 different readers + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + assertEquals(1, cache.count()); + // test the mappings + ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + assertTrue(cleanupKeyToCountMap.isEmpty()); + + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); + // test the mapping + assertEquals(2, cache.count()); + assertTrue(cleanupKeyToCountMap.isEmpty()); + // create another entry for the second reader + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes("id", "1")); + // test the mapping + assertEquals(3, cache.count()); + assertTrue(cleanupKeyToCountMap.isEmpty()); + + // Close the reader, to create stale entries + reader.close(); + // cache count should not be affected + assertEquals(3, cache.count()); + // test the mapping, cleanupKeyToCountMap should be empty + assertTrue(cleanupKeyToCountMap.isEmpty()); + // send removal notification for first reader + IndicesRequestCache.Key key = new IndicesRequestCache.Key( + indexShard.shardId(), + getTermBytes(), + getReaderCacheKeyId(reader), + indexShard.hashCode() + ); + cache.onRemoval( + new RemovalNotification, BytesReference>( + new ICacheKey<>(key), + getTermBytes(), + RemovalReason.EVICTED + ) + ); + // test the mapping, it should stay the same + assertTrue(cleanupKeyToCountMap.isEmpty()); + + IOUtils.close(secondReader); + } + // test adding to cleanupKeyToCountMap with multiple threads public void testAddToCleanupKeyToCountMap() throws Exception { threadPool = getThreadPool(); From b8a6c79c9b9e48c899746ad6773dc9579a35c945 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 12 Jun 2024 12:28:56 -0700 Subject: [PATCH 08/20] revert feature flags Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCache.java | 21 +- .../indices/IndicesRequestCacheTests.java | 190 +----------------- 2 files changed, 16 insertions(+), 195 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 26c7e86e5d718..06cd77a34fe0b 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -61,7 +61,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.RatioValue; import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.StreamInput; @@ -176,8 +175,7 @@ public final class IndicesRequestCache implements RemovalListener, BytesReference> notification ) { - if (!pluggableCacheEnabled || notification.getRemovalReason() == RemovalReason.REPLACED) { + if (notification.getRemovalReason() == RemovalReason.REPLACED) { // The reason of the notification is REPLACED when a cache entry's value is updated, since replacing an entry // does not affect the staleness count, we skip such notifications. return; @@ -661,7 +652,7 @@ private void updateStaleCountOnEntryRemoval( * @param cleanupKey the CleanupKey that has been marked for cleanup */ private void incrementStaleKeysCount(CleanupKey cleanupKey) { - if (!pluggableCacheEnabled || cleanupKey.entity == null) { + if (cleanupKey.entity == null) { return; } IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); @@ -803,7 +794,7 @@ private synchronized void cleanCache(double stalenessThreshold) { * @return true if the cache cleanup process can be skipped, false otherwise. */ private synchronized boolean canSkipCacheCleanup(double cleanThresholdPercent) { - if (!pluggableCacheEnabled || cleanThresholdPercent == 0.0) { + if (cleanThresholdPercent == 0.0) { return false; } double staleKeysInCachePercentage = staleKeysInCachePercentage(); diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index da76325b54e3c..907847490a70c 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -195,58 +195,6 @@ public void testBasicOperationsCache() throws Exception { assertEquals(0, cache.numRegisteredCloseListeners()); } - public void testBasicOperationsCacheWithFeatureFlag() throws Exception { - threadPool = getThreadPool(); - Settings settings = Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(); - cache = getIndicesRequestCache(settings); - writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = getReader(writer, indexShard.shardId()); - - // initial cache - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - BytesReference value = cache.getOrCompute(entity, loader, reader, getTermBytes()); - assertEquals("foo", value.streamInput().readString()); - ShardRequestCache requestCacheStats = indexShard.requestCache(); - assertEquals(0, requestCacheStats.stats().getHitCount()); - assertEquals(1, requestCacheStats.stats().getMissCount()); - assertEquals(0, requestCacheStats.stats().getEvictions()); - assertFalse(loader.loadedFromCache); - assertEquals(1, cache.count()); - - // cache hit - entity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(reader, 0); - value = cache.getOrCompute(entity, loader, reader, getTermBytes()); - assertEquals("foo", value.streamInput().readString()); - requestCacheStats = indexShard.requestCache(); - assertEquals(1, requestCacheStats.stats().getHitCount()); - assertEquals(1, requestCacheStats.stats().getMissCount()); - assertEquals(0, requestCacheStats.stats().getEvictions()); - assertTrue(loader.loadedFromCache); - assertEquals(1, cache.count()); - assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > value.length()); - assertEquals(1, cache.numRegisteredCloseListeners()); - - // Closing the cache doesn't modify an already returned CacheEntity - if (randomBoolean()) { - reader.close(); - } else { - indexShard.close("test", true, true); // closed shard but reader is still open - cache.clear(entity); - } - cache.cacheCleanupManager.cleanCache(); - assertEquals(1, requestCacheStats.stats().getHitCount()); - assertEquals(1, requestCacheStats.stats().getMissCount()); - assertEquals(0, requestCacheStats.stats().getEvictions()); - assertTrue(loader.loadedFromCache); - assertEquals(0, cache.count()); - assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); - - IOUtils.close(reader); - assertEquals(0, cache.numRegisteredCloseListeners()); - } - public void testCacheDifferentReaders() throws Exception { threadPool = getThreadPool(); cache = getIndicesRequestCache(Settings.EMPTY); @@ -391,44 +339,10 @@ public void testCacheCleanupBasedOnZeroThreshold() throws Exception { IOUtils.close(secondReader); } - // when the feature flag is disabled, stale keys should be cleaned up every time cache cleaner is invoked. - public void testCacheCleanupWhenFeatureFlagIsDisabled() throws Exception { - threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0%") - .put(FeatureFlags.PLUGGABLE_CACHE, false) - .build(); - cache = getIndicesRequestCache(settings); - writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = getReader(writer, indexShard.shardId()); - DirectoryReader secondReader = getReader(writer, indexShard.shardId()); - - // Get 2 entries into the cache - cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); - assertEquals(1, cache.count()); - - cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); - assertEquals(2, cache.count()); - - // Close the reader, to be enqueued for cleanup - // 1 out of 2 keys ie 50% are now stale. - reader.close(); - // cache count should not be affected - assertEquals(2, cache.count()); - // clean cache with 0% staleness threshold - cache.cacheCleanupManager.cleanCache(); - // cleanup should remove the stale-key - assertEquals(1, cache.count()); - IOUtils.close(secondReader); - } - // when staleness count is higher than stale threshold, stale keys should be cleaned up. public void testCacheCleanupBasedOnStaleThreshold_StalenessHigherThanThreshold() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.49") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.49").build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -463,10 +377,7 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessHigherThanThreshold() // when staleness count equal to stale threshold, stale keys should be cleaned up. public void testCacheCleanupBasedOnStaleThreshold_StalenessEqualToThreshold() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.5") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.5").build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); DirectoryReader reader = getReader(writer, indexShard.shardId()); @@ -498,10 +409,7 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessEqualToThreshold() th // when a cache entry that is Stale is evicted for any reason, we have to deduct the count from our staleness count public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); ShardId shardId = indexShard.shardId(); @@ -563,10 +471,7 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount( // when a cache entry that is NOT Stale is evicted for any reason, staleness count should NOT be deducted public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsStaleCount() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); ShardId shardId = indexShard.shardId(); @@ -627,10 +532,7 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS // when a cache entry that is NOT Stale is evicted WITHOUT its reader closing, we should NOT deduct it from staleness count public void testStaleCount_WithoutReaderClosing_DecrementsStaleCount() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -669,10 +571,7 @@ public void testStaleCount_WithoutReaderClosing_DecrementsStaleCount() throws Ex // test staleness count based on removal notifications public void testStaleCount_OnRemovalNotifications() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -726,10 +625,7 @@ public void testStaleCount_OnRemovalNotifications() throws Exception { // when staleness count less than the stale threshold, stale keys should NOT be cleaned up. public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -762,10 +658,7 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() // test the cleanupKeyToCountMap are set appropriately when both readers are closed public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -850,70 +743,10 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { IOUtils.close(secondReader); } - // test the cleanupKeyToCountMap stays empty when the pluggable cache feature flag is disabled - public void testCleanupKeyToCountMapAreSetAppropriatelyWhenFeatureFlagIsDisabled() throws Exception { - threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") - .put(FeatureFlags.PLUGGABLE_CACHE, false) - .build(); - cache = getIndicesRequestCache(settings); - - writer.addDocument(newDoc(0, "foo")); - ShardId shardId = indexShard.shardId(); - DirectoryReader reader = getReader(writer, shardId); - DirectoryReader secondReader = getReader(writer, shardId); - - // Get 2 entries into the cache from 2 different readers - cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); - assertEquals(1, cache.count()); - // test the mappings - ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); - assertTrue(cleanupKeyToCountMap.isEmpty()); - - cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); - // test the mapping - assertEquals(2, cache.count()); - assertTrue(cleanupKeyToCountMap.isEmpty()); - // create another entry for the second reader - cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes("id", "1")); - // test the mapping - assertEquals(3, cache.count()); - assertTrue(cleanupKeyToCountMap.isEmpty()); - - // Close the reader, to create stale entries - reader.close(); - // cache count should not be affected - assertEquals(3, cache.count()); - // test the mapping, cleanupKeyToCountMap should be empty - assertTrue(cleanupKeyToCountMap.isEmpty()); - // send removal notification for first reader - IndicesRequestCache.Key key = new IndicesRequestCache.Key( - indexShard.shardId(), - getTermBytes(), - getReaderCacheKeyId(reader), - indexShard.hashCode() - ); - cache.onRemoval( - new RemovalNotification, BytesReference>( - new ICacheKey<>(key), - getTermBytes(), - RemovalReason.EVICTED - ) - ); - // test the mapping, it should stay the same - assertTrue(cleanupKeyToCountMap.isEmpty()); - - IOUtils.close(secondReader); - } - // test adding to cleanupKeyToCountMap with multiple threads public void testAddToCleanupKeyToCountMap() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); cache = getIndicesRequestCache(settings); int numberOfThreads = 10; @@ -1117,10 +950,7 @@ public void testClosingIndexWipesStats() throws Exception { public void testCacheCleanupBasedOnStaleThreshold_thresholdUpdate() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); From 38ef8ba1f35f9831304f453764440a09beca2743 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 12 Jun 2024 14:23:46 -0700 Subject: [PATCH 09/20] changelog to releaselog Signed-off-by: Kiran Prakash --- CHANGELOG.md | 1 - release-notes/opensearch.release-notes-2.15.0.md | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d77a4323984b..0066e077b16a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Removed ### Fixed -- Fix Concurrent Modification Exception in Indices Request Cache([#14032](https://github.com/opensearch-project/OpenSearch/pull/14221)) ### Security diff --git a/release-notes/opensearch.release-notes-2.15.0.md b/release-notes/opensearch.release-notes-2.15.0.md index 4e95173abd700..02458b0c89b7d 100644 --- a/release-notes/opensearch.release-notes-2.15.0.md +++ b/release-notes/opensearch.release-notes-2.15.0.md @@ -70,4 +70,5 @@ - Fix double invocation of postCollection when MultiBucketCollector is present ([#14015](https://github.com/opensearch-project/OpenSearch/pull/14015)) - Fix ReplicaShardBatchAllocator to batch shards without duplicates ([#13710](https://github.com/opensearch-project/OpenSearch/pull/13710)) - Java high-level REST client bulk() is not respecting the bulkRequest.requireAlias(true) method call ([#14146](https://github.com/opensearch-project/OpenSearch/pull/14146)) -- Fix ShardNotFoundException during request cache clean up ([#14219](https://github.com/opensearch-project/OpenSearch/pull/14219)) \ No newline at end of file +- Fix ShardNotFoundException during request cache clean up ([#14219](https://github.com/opensearch-project/OpenSearch/pull/14219)) +- Fix Concurrent Modification Exception in Indices Request Cache([#14032](https://github.com/opensearch-project/OpenSearch/pull/14221)) \ No newline at end of file From 10a907647438d61e26550d7e0dff0bd25d2c4b31 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 11 Jun 2024 22:59:58 -0700 Subject: [PATCH 10/20] use concurrentmap Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCache.java | 21 ++- .../indices/IndicesRequestCacheTests.java | 120 ++++++++++-------- 2 files changed, 80 insertions(+), 61 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 06cd77a34fe0b..cb901a0735544 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -61,6 +61,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.RatioValue; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.StreamInput; @@ -175,7 +176,8 @@ public final class IndicesRequestCache implements RemovalListener, BytesReference> notification ) { - if (notification.getRemovalReason() == RemovalReason.REPLACED) { + if (pluggableCacheEnabled || notification.getRemovalReason() == RemovalReason.REPLACED) { // The reason of the notification is REPLACED when a cache entry's value is updated, since replacing an entry // does not affect the staleness count, we skip such notifications. return; @@ -652,7 +661,7 @@ private void updateStaleCountOnEntryRemoval( * @param cleanupKey the CleanupKey that has been marked for cleanup */ private void incrementStaleKeysCount(CleanupKey cleanupKey) { - if (cleanupKey.entity == null) { + if (pluggableCacheEnabled || cleanupKey.entity == null) { return; } IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); @@ -794,7 +803,7 @@ private synchronized void cleanCache(double stalenessThreshold) { * @return true if the cache cleanup process can be skipped, false otherwise. */ private synchronized boolean canSkipCacheCleanup(double cleanThresholdPercent) { - if (cleanThresholdPercent == 0.0) { + if (pluggableCacheEnabled || cleanThresholdPercent == 0.0) { return false; } double staleKeysInCachePercentage = staleKeysInCachePercentage(); diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 907847490a70c..723967e12304e 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -195,6 +195,58 @@ public void testBasicOperationsCache() throws Exception { assertEquals(0, cache.numRegisteredCloseListeners()); } + public void testBasicOperationsCacheWithFeatureFlag() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(); + cache = getIndicesRequestCache(settings); + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + + // initial cache + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); + Loader loader = new Loader(reader, 0); + BytesReference value = cache.getOrCompute(entity, loader, reader, getTermBytes()); + assertEquals("foo", value.streamInput().readString()); + ShardRequestCache requestCacheStats = indexShard.requestCache(); + assertEquals(0, requestCacheStats.stats().getHitCount()); + assertEquals(1, requestCacheStats.stats().getMissCount()); + assertEquals(0, requestCacheStats.stats().getEvictions()); + assertFalse(loader.loadedFromCache); + assertEquals(1, cache.count()); + + // cache hit + entity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(reader, 0); + value = cache.getOrCompute(entity, loader, reader, getTermBytes()); + assertEquals("foo", value.streamInput().readString()); + requestCacheStats = indexShard.requestCache(); + assertEquals(1, requestCacheStats.stats().getHitCount()); + assertEquals(1, requestCacheStats.stats().getMissCount()); + assertEquals(0, requestCacheStats.stats().getEvictions()); + assertTrue(loader.loadedFromCache); + assertEquals(1, cache.count()); + assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > value.length()); + assertEquals(1, cache.numRegisteredCloseListeners()); + + // Closing the cache doesn't modify an already returned CacheEntity + if (randomBoolean()) { + reader.close(); + } else { + indexShard.close("test", true, true); // closed shard but reader is still open + cache.clear(entity); + } + cache.cacheCleanupManager.cleanCache(); + assertEquals(1, requestCacheStats.stats().getHitCount()); + assertEquals(1, requestCacheStats.stats().getMissCount()); + assertEquals(0, requestCacheStats.stats().getEvictions()); + assertTrue(loader.loadedFromCache); + assertEquals(0, cache.count()); + assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); + + IOUtils.close(reader); + assertEquals(0, cache.numRegisteredCloseListeners()); + } + public void testCacheDifferentReaders() throws Exception { threadPool = getThreadPool(); cache = getIndicesRequestCache(Settings.EMPTY); @@ -744,7 +796,7 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { } // test adding to cleanupKeyToCountMap with multiple threads - public void testAddToCleanupKeyToCountMap() throws Exception { + public void testAddToCleanupKeyToCountMap() throws InterruptedException { threadPool = getThreadPool(); Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); cache = getIndicesRequestCache(settings); @@ -764,7 +816,7 @@ public void testAddToCleanupKeyToCountMap() throws Exception { cache.cacheCleanupManager.addToCleanupKeyToCountMap(indexShard.shardId(), UUID.randomUUID().toString()); } } catch (ConcurrentModificationException e) { - logger.error("ConcurrentModificationException detected in thread : " + e.getMessage()); + e.printStackTrace(); exceptionDetected.set(true); // Set flag if exception is detected } }); @@ -783,21 +835,28 @@ public void testAddToCleanupKeyToCountMap() throws Exception { }); } } catch (ConcurrentModificationException e) { - logger.error("ConcurrentModificationException detected in main thread : " + e.getMessage()); + e.printStackTrace(); exceptionDetected.set(true); // Set flag if exception is detected } }); executorService.shutdown(); executorService.awaitTermination(60, TimeUnit.SECONDS); + assertFalse(exceptionDetected.get()); } private IndicesRequestCache getIndicesRequestCache(Settings settings) { IndicesService indicesService = getInstanceFromNode(IndicesService.class); - return new IndicesRequestCache( - settings, - indicesService.indicesRequestCache.cacheEntityLookup, + return new IndicesRequestCache(settings, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool, ClusterServiceUtils.createClusterService(threadPool) @@ -1414,55 +1473,6 @@ public void testDeleteAndCreateIndexShardOnSameNodeAndVerifyStats() throws Excep IOUtils.close(reader, writer, dir, cache); } - public void testIndexShardClosedAndVerifyCacheCleanUpWorksSuccessfully() throws Exception { - threadPool = getThreadPool(); - String indexName = "test1"; - // Create a shard - IndexService indexService = createIndex( - indexName, - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() - ); - IndexShard indexShard = indexService.getShard(0); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - writer.addDocument(newDoc(0, "foo")); - writer.addDocument(newDoc(1, "hack")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), indexShard.shardId()); - Loader loader = new Loader(reader, 0); - - // Set clean interval to a high value as we will do it manually here. - IndicesRequestCache cache = getIndicesRequestCache( - Settings.builder() - .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, TimeValue.timeValueMillis(100000)) - .build() - ); - IndicesService.IndexShardCacheEntity cacheEntity = new IndicesService.IndexShardCacheEntity(indexShard); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "bar"); - - // Cache some values for indexShard - BytesReference value = cache.getOrCompute(cacheEntity, loader, reader, getTermBytes()); - - // Verify response and stats. - assertEquals("foo", value.streamInput().readString()); - RequestCacheStats stats = indexShard.requestCache().stats(); - assertEquals("foo", value.streamInput().readString()); - assertEquals(1, cache.count()); - assertEquals(1, stats.getMissCount()); - assertTrue(stats.getMemorySizeInBytes() > 0); - - // Remove the shard making its cache entries stale - IOUtils.close(reader, writer, dir); - indexService.removeShard(0, "force"); - - assertBusy(() -> { assertEquals(IndexShardState.CLOSED, indexShard.state()); }, 1, TimeUnit.SECONDS); - - // Trigger clean up of cache. Should not throw any exception. - cache.cacheCleanupManager.cleanCache(); - // Verify all cleared up. - assertEquals(0, cache.count()); - IOUtils.close(cache); - } - public static String generateString(int length) { String characters = "abcdefghijklmnopqrstuvwxyz"; StringBuilder sb = new StringBuilder(length); From 414821f43dee48ea005d3a86e7d344fa848d352f Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 12 Jun 2024 10:32:59 -0700 Subject: [PATCH 11/20] Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCacheTests.java | 51 ------------------- 1 file changed, 51 deletions(-) diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 723967e12304e..1e82e25abcf0d 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -795,57 +795,6 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { IOUtils.close(secondReader); } - // test adding to cleanupKeyToCountMap with multiple threads - public void testAddToCleanupKeyToCountMap() throws InterruptedException { - threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); - cache = getIndicesRequestCache(settings); - - int numberOfThreads = 10; - int numberOfIterations = 1000; - Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread - AtomicBoolean exceptionDetected = new AtomicBoolean(false); - - ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads); - - for (int i = 0; i < numberOfThreads; i++) { - executorService.submit(() -> { - phaser.arriveAndAwaitAdvance(); // Ensure all threads start at the same time - try { - for (int j = 0; j < numberOfIterations; j++) { - cache.cacheCleanupManager.addToCleanupKeyToCountMap(indexShard.shardId(), UUID.randomUUID().toString()); - } - } catch (ConcurrentModificationException e) { - e.printStackTrace(); - exceptionDetected.set(true); // Set flag if exception is detected - } - }); - } - phaser.arriveAndAwaitAdvance(); // Start all threads - - // Main thread iterates over the map - executorService.submit(() -> { - try { - for (int j = 0; j < numberOfIterations; j++) { - cache.cacheCleanupManager.getCleanupKeyToCountMap().forEach((k, v) -> { - v.forEach((k1, v1) -> { - // Accessing the map to create contention - v.get(k1); - }); - }); - } - } catch (ConcurrentModificationException e) { - e.printStackTrace(); - exceptionDetected.set(true); // Set flag if exception is detected - } - }); - - executorService.shutdown(); - executorService.awaitTermination(60, TimeUnit.SECONDS); - - assertFalse(exceptionDetected.get()); - } - private IndicesRequestCache getIndicesRequestCache(Settings settings) { IndicesService indicesService = getInstanceFromNode(IndicesService.class); return new IndicesRequestCache(settings, (shardId -> { From 1bddc7ae362de1f729073c48d28ce2ad945acb60 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 12 Jun 2024 10:33:38 -0700 Subject: [PATCH 12/20] Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCacheTests.java | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 1e82e25abcf0d..fdbc7f05c5f6c 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -795,6 +795,56 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { IOUtils.close(secondReader); } + // test adding to cleanupKeyToCountMap with multiple threads + public void testAddToCleanupKeyToCountMap() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); + cache = getIndicesRequestCache(settings); + + int numberOfThreads = 10; + int numberOfIterations = 1000; + Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread + AtomicBoolean exceptionDetected = new AtomicBoolean(false); + + ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads); + + for (int i = 0; i < numberOfThreads; i++) { + executorService.submit(() -> { + phaser.arriveAndAwaitAdvance(); // Ensure all threads start at the same time + try { + for (int j = 0; j < numberOfIterations; j++) { + cache.cacheCleanupManager.addToCleanupKeyToCountMap(indexShard.shardId(), UUID.randomUUID().toString()); + } + } catch (ConcurrentModificationException e) { + logger.error("ConcurrentModificationException detected in thread : " + e.getMessage()); + exceptionDetected.set(true); // Set flag if exception is detected + } + }); + } + phaser.arriveAndAwaitAdvance(); // Start all threads + + // Main thread iterates over the map + executorService.submit(() -> { + try { + for (int j = 0; j < numberOfIterations; j++) { + cache.cacheCleanupManager.getCleanupKeyToCountMap().forEach((k, v) -> { + v.forEach((k1, v1) -> { + // Accessing the map to create contention + v.get(k1); + }); + }); + } + } catch (ConcurrentModificationException e) { + logger.error("ConcurrentModificationException detected in main thread : " + e.getMessage()); + exceptionDetected.set(true); // Set flag if exception is detected + } + }); + + executorService.shutdown(); + executorService.awaitTermination(60, TimeUnit.SECONDS); + assertFalse(exceptionDetected.get()); + } + private IndicesRequestCache getIndicesRequestCache(Settings settings) { IndicesService indicesService = getInstanceFromNode(IndicesService.class); return new IndicesRequestCache(settings, (shardId -> { From 992b4ca984abfbee167441cd3ae0ac2b3bd58c7c Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 12 Jun 2024 11:16:43 -0700 Subject: [PATCH 13/20] Update CHANGELOG.md Signed-off-by: Kiran Prakash --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0066e077b16a2..2d77a4323984b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Removed ### Fixed +- Fix Concurrent Modification Exception in Indices Request Cache([#14032](https://github.com/opensearch-project/OpenSearch/pull/14221)) ### Security From 8ee67587cb90e3dfdb29d1c9c451107bcaeb9772 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 12 Jun 2024 11:21:23 -0700 Subject: [PATCH 14/20] Update IndicesRequestCache.java Signed-off-by: Kiran Prakash --- .../java/org/opensearch/indices/IndicesRequestCache.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index cb901a0735544..26c7e86e5d718 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -564,7 +564,7 @@ void enqueueCleanupKey(CleanupKey cleanupKey) { * @param cleanupKey the CleanupKey to be updated in the map */ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) { - if (pluggableCacheEnabled || cleanupKey.entity == null) { + if (!pluggableCacheEnabled || cleanupKey.entity == null) { return; } IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); @@ -601,7 +601,7 @@ private void updateStaleCountOnEntryRemoval( CleanupKey cleanupKey, RemovalNotification, BytesReference> notification ) { - if (pluggableCacheEnabled || notification.getRemovalReason() == RemovalReason.REPLACED) { + if (!pluggableCacheEnabled || notification.getRemovalReason() == RemovalReason.REPLACED) { // The reason of the notification is REPLACED when a cache entry's value is updated, since replacing an entry // does not affect the staleness count, we skip such notifications. return; @@ -661,7 +661,7 @@ private void updateStaleCountOnEntryRemoval( * @param cleanupKey the CleanupKey that has been marked for cleanup */ private void incrementStaleKeysCount(CleanupKey cleanupKey) { - if (pluggableCacheEnabled || cleanupKey.entity == null) { + if (!pluggableCacheEnabled || cleanupKey.entity == null) { return; } IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); @@ -803,7 +803,7 @@ private synchronized void cleanCache(double stalenessThreshold) { * @return true if the cache cleanup process can be skipped, false otherwise. */ private synchronized boolean canSkipCacheCleanup(double cleanThresholdPercent) { - if (pluggableCacheEnabled || cleanThresholdPercent == 0.0) { + if (!pluggableCacheEnabled || cleanThresholdPercent == 0.0) { return false; } double staleKeysInCachePercentage = staleKeysInCachePercentage(); From ec1856e79e8af8a063c6d99dd80b8eecacc4fd28 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 12 Jun 2024 11:31:25 -0700 Subject: [PATCH 15/20] Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCacheTests.java | 50 +++++++++++++++---- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index fdbc7f05c5f6c..fbe760500b495 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -394,7 +394,10 @@ public void testCacheCleanupBasedOnZeroThreshold() throws Exception { // when staleness count is higher than stale threshold, stale keys should be cleaned up. public void testCacheCleanupBasedOnStaleThreshold_StalenessHigherThanThreshold() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.49").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.49") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -429,7 +432,10 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessHigherThanThreshold() // when staleness count equal to stale threshold, stale keys should be cleaned up. public void testCacheCleanupBasedOnStaleThreshold_StalenessEqualToThreshold() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.5").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.5") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); DirectoryReader reader = getReader(writer, indexShard.shardId()); @@ -461,7 +467,10 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessEqualToThreshold() th // when a cache entry that is Stale is evicted for any reason, we have to deduct the count from our staleness count public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); ShardId shardId = indexShard.shardId(); @@ -523,7 +532,10 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount( // when a cache entry that is NOT Stale is evicted for any reason, staleness count should NOT be deducted public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsStaleCount() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); ShardId shardId = indexShard.shardId(); @@ -584,7 +596,10 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS // when a cache entry that is NOT Stale is evicted WITHOUT its reader closing, we should NOT deduct it from staleness count public void testStaleCount_WithoutReaderClosing_DecrementsStaleCount() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -623,7 +638,10 @@ public void testStaleCount_WithoutReaderClosing_DecrementsStaleCount() throws Ex // test staleness count based on removal notifications public void testStaleCount_OnRemovalNotifications() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -677,7 +695,10 @@ public void testStaleCount_OnRemovalNotifications() throws Exception { // when staleness count less than the stale threshold, stale keys should NOT be cleaned up. public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -710,7 +731,10 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() // test the cleanupKeyToCountMap are set appropriately when both readers are closed public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -798,7 +822,10 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { // test adding to cleanupKeyToCountMap with multiple threads public void testAddToCleanupKeyToCountMap() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); int numberOfThreads = 10; @@ -1008,7 +1035,10 @@ public void testClosingIndexWipesStats() throws Exception { public void testCacheCleanupBasedOnStaleThreshold_thresholdUpdate() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%") + .put(FeatureFlags.PLUGGABLE_CACHE, true) + .build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); From 9642106844e47c29eb81475c60587f145acee956 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 12 Jun 2024 11:38:32 -0700 Subject: [PATCH 16/20] Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCacheTests.java | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index fbe760500b495..8629a14cce631 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -391,6 +391,37 @@ public void testCacheCleanupBasedOnZeroThreshold() throws Exception { IOUtils.close(secondReader); } + // when the feature flag is disabled, stale keys should be cleaned up every time cache cleaner is invoked. + public void testCacheCleanupWhenFeatureFlagIsDisabled() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0%") + .put(FeatureFlags.PLUGGABLE_CACHE, false) + .build(); + cache = getIndicesRequestCache(settings); + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); + + // Get 2 entries into the cache + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + assertEquals(1, cache.count()); + + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); + assertEquals(2, cache.count()); + + // Close the reader, to be enqueued for cleanup + // 1 out of 2 keys ie 50% are now stale. + reader.close(); + // cache count should not be affected + assertEquals(2, cache.count()); + // clean cache with 0% staleness threshold + cache.cacheCleanupManager.cleanCache(); + // cleanup should remove the stale-key + assertEquals(1, cache.count()); + IOUtils.close(secondReader); + } + // when staleness count is higher than stale threshold, stale keys should be cleaned up. public void testCacheCleanupBasedOnStaleThreshold_StalenessHigherThanThreshold() throws Exception { threadPool = getThreadPool(); @@ -819,6 +850,63 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { IOUtils.close(secondReader); } + // test the cleanupKeyToCountMap stays empty when the pluggable cache feature flag is disabled + public void testCleanupKeyToCountMapAreSetAppropriatelyWhenFeatureFlagIsDisabled() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder() + .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") + .put(FeatureFlags.PLUGGABLE_CACHE, false) + .build(); + cache = getIndicesRequestCache(settings); + + writer.addDocument(newDoc(0, "foo")); + ShardId shardId = indexShard.shardId(); + DirectoryReader reader = getReader(writer, shardId); + DirectoryReader secondReader = getReader(writer, shardId); + + // Get 2 entries into the cache from 2 different readers + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + assertEquals(1, cache.count()); + // test the mappings + ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + assertTrue(cleanupKeyToCountMap.isEmpty()); + + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); + // test the mapping + assertEquals(2, cache.count()); + assertTrue(cleanupKeyToCountMap.isEmpty()); + // create another entry for the second reader + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes("id", "1")); + // test the mapping + assertEquals(3, cache.count()); + assertTrue(cleanupKeyToCountMap.isEmpty()); + + // Close the reader, to create stale entries + reader.close(); + // cache count should not be affected + assertEquals(3, cache.count()); + // test the mapping, cleanupKeyToCountMap should be empty + assertTrue(cleanupKeyToCountMap.isEmpty()); + // send removal notification for first reader + IndicesRequestCache.Key key = new IndicesRequestCache.Key( + indexShard.shardId(), + getTermBytes(), + getReaderCacheKeyId(reader), + indexShard.hashCode() + ); + cache.onRemoval( + new RemovalNotification, BytesReference>( + new ICacheKey<>(key), + getTermBytes(), + RemovalReason.EVICTED + ) + ); + // test the mapping, it should stay the same + assertTrue(cleanupKeyToCountMap.isEmpty()); + + IOUtils.close(secondReader); + } + // test adding to cleanupKeyToCountMap with multiple threads public void testAddToCleanupKeyToCountMap() throws Exception { threadPool = getThreadPool(); From 588689f8bbd6190a57b15dc8010747688e5f5348 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 12 Jun 2024 12:28:56 -0700 Subject: [PATCH 17/20] revert feature flags Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCache.java | 21 +- .../indices/IndicesRequestCacheTests.java | 190 +----------------- 2 files changed, 16 insertions(+), 195 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 26c7e86e5d718..06cd77a34fe0b 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -61,7 +61,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.RatioValue; import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.StreamInput; @@ -176,8 +175,7 @@ public final class IndicesRequestCache implements RemovalListener, BytesReference> notification ) { - if (!pluggableCacheEnabled || notification.getRemovalReason() == RemovalReason.REPLACED) { + if (notification.getRemovalReason() == RemovalReason.REPLACED) { // The reason of the notification is REPLACED when a cache entry's value is updated, since replacing an entry // does not affect the staleness count, we skip such notifications. return; @@ -661,7 +652,7 @@ private void updateStaleCountOnEntryRemoval( * @param cleanupKey the CleanupKey that has been marked for cleanup */ private void incrementStaleKeysCount(CleanupKey cleanupKey) { - if (!pluggableCacheEnabled || cleanupKey.entity == null) { + if (cleanupKey.entity == null) { return; } IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); @@ -803,7 +794,7 @@ private synchronized void cleanCache(double stalenessThreshold) { * @return true if the cache cleanup process can be skipped, false otherwise. */ private synchronized boolean canSkipCacheCleanup(double cleanThresholdPercent) { - if (!pluggableCacheEnabled || cleanThresholdPercent == 0.0) { + if (cleanThresholdPercent == 0.0) { return false; } double staleKeysInCachePercentage = staleKeysInCachePercentage(); diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 8629a14cce631..369a6ee73058f 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -195,58 +195,6 @@ public void testBasicOperationsCache() throws Exception { assertEquals(0, cache.numRegisteredCloseListeners()); } - public void testBasicOperationsCacheWithFeatureFlag() throws Exception { - threadPool = getThreadPool(); - Settings settings = Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(); - cache = getIndicesRequestCache(settings); - writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = getReader(writer, indexShard.shardId()); - - // initial cache - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - BytesReference value = cache.getOrCompute(entity, loader, reader, getTermBytes()); - assertEquals("foo", value.streamInput().readString()); - ShardRequestCache requestCacheStats = indexShard.requestCache(); - assertEquals(0, requestCacheStats.stats().getHitCount()); - assertEquals(1, requestCacheStats.stats().getMissCount()); - assertEquals(0, requestCacheStats.stats().getEvictions()); - assertFalse(loader.loadedFromCache); - assertEquals(1, cache.count()); - - // cache hit - entity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(reader, 0); - value = cache.getOrCompute(entity, loader, reader, getTermBytes()); - assertEquals("foo", value.streamInput().readString()); - requestCacheStats = indexShard.requestCache(); - assertEquals(1, requestCacheStats.stats().getHitCount()); - assertEquals(1, requestCacheStats.stats().getMissCount()); - assertEquals(0, requestCacheStats.stats().getEvictions()); - assertTrue(loader.loadedFromCache); - assertEquals(1, cache.count()); - assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > value.length()); - assertEquals(1, cache.numRegisteredCloseListeners()); - - // Closing the cache doesn't modify an already returned CacheEntity - if (randomBoolean()) { - reader.close(); - } else { - indexShard.close("test", true, true); // closed shard but reader is still open - cache.clear(entity); - } - cache.cacheCleanupManager.cleanCache(); - assertEquals(1, requestCacheStats.stats().getHitCount()); - assertEquals(1, requestCacheStats.stats().getMissCount()); - assertEquals(0, requestCacheStats.stats().getEvictions()); - assertTrue(loader.loadedFromCache); - assertEquals(0, cache.count()); - assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); - - IOUtils.close(reader); - assertEquals(0, cache.numRegisteredCloseListeners()); - } - public void testCacheDifferentReaders() throws Exception { threadPool = getThreadPool(); cache = getIndicesRequestCache(Settings.EMPTY); @@ -391,44 +339,10 @@ public void testCacheCleanupBasedOnZeroThreshold() throws Exception { IOUtils.close(secondReader); } - // when the feature flag is disabled, stale keys should be cleaned up every time cache cleaner is invoked. - public void testCacheCleanupWhenFeatureFlagIsDisabled() throws Exception { - threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0%") - .put(FeatureFlags.PLUGGABLE_CACHE, false) - .build(); - cache = getIndicesRequestCache(settings); - writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = getReader(writer, indexShard.shardId()); - DirectoryReader secondReader = getReader(writer, indexShard.shardId()); - - // Get 2 entries into the cache - cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); - assertEquals(1, cache.count()); - - cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); - assertEquals(2, cache.count()); - - // Close the reader, to be enqueued for cleanup - // 1 out of 2 keys ie 50% are now stale. - reader.close(); - // cache count should not be affected - assertEquals(2, cache.count()); - // clean cache with 0% staleness threshold - cache.cacheCleanupManager.cleanCache(); - // cleanup should remove the stale-key - assertEquals(1, cache.count()); - IOUtils.close(secondReader); - } - // when staleness count is higher than stale threshold, stale keys should be cleaned up. public void testCacheCleanupBasedOnStaleThreshold_StalenessHigherThanThreshold() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.49") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.49").build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -463,10 +377,7 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessHigherThanThreshold() // when staleness count equal to stale threshold, stale keys should be cleaned up. public void testCacheCleanupBasedOnStaleThreshold_StalenessEqualToThreshold() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.5") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.5").build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); DirectoryReader reader = getReader(writer, indexShard.shardId()); @@ -498,10 +409,7 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessEqualToThreshold() th // when a cache entry that is Stale is evicted for any reason, we have to deduct the count from our staleness count public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); ShardId shardId = indexShard.shardId(); @@ -563,10 +471,7 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount( // when a cache entry that is NOT Stale is evicted for any reason, staleness count should NOT be deducted public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsStaleCount() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); ShardId shardId = indexShard.shardId(); @@ -627,10 +532,7 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS // when a cache entry that is NOT Stale is evicted WITHOUT its reader closing, we should NOT deduct it from staleness count public void testStaleCount_WithoutReaderClosing_DecrementsStaleCount() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -669,10 +571,7 @@ public void testStaleCount_WithoutReaderClosing_DecrementsStaleCount() throws Ex // test staleness count based on removal notifications public void testStaleCount_OnRemovalNotifications() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -726,10 +625,7 @@ public void testStaleCount_OnRemovalNotifications() throws Exception { // when staleness count less than the stale threshold, stale keys should NOT be cleaned up. public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -762,10 +658,7 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() // test the cleanupKeyToCountMap are set appropriately when both readers are closed public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); @@ -850,70 +743,10 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { IOUtils.close(secondReader); } - // test the cleanupKeyToCountMap stays empty when the pluggable cache feature flag is disabled - public void testCleanupKeyToCountMapAreSetAppropriatelyWhenFeatureFlagIsDisabled() throws Exception { - threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51") - .put(FeatureFlags.PLUGGABLE_CACHE, false) - .build(); - cache = getIndicesRequestCache(settings); - - writer.addDocument(newDoc(0, "foo")); - ShardId shardId = indexShard.shardId(); - DirectoryReader reader = getReader(writer, shardId); - DirectoryReader secondReader = getReader(writer, shardId); - - // Get 2 entries into the cache from 2 different readers - cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); - assertEquals(1, cache.count()); - // test the mappings - ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); - assertTrue(cleanupKeyToCountMap.isEmpty()); - - cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); - // test the mapping - assertEquals(2, cache.count()); - assertTrue(cleanupKeyToCountMap.isEmpty()); - // create another entry for the second reader - cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes("id", "1")); - // test the mapping - assertEquals(3, cache.count()); - assertTrue(cleanupKeyToCountMap.isEmpty()); - - // Close the reader, to create stale entries - reader.close(); - // cache count should not be affected - assertEquals(3, cache.count()); - // test the mapping, cleanupKeyToCountMap should be empty - assertTrue(cleanupKeyToCountMap.isEmpty()); - // send removal notification for first reader - IndicesRequestCache.Key key = new IndicesRequestCache.Key( - indexShard.shardId(), - getTermBytes(), - getReaderCacheKeyId(reader), - indexShard.hashCode() - ); - cache.onRemoval( - new RemovalNotification, BytesReference>( - new ICacheKey<>(key), - getTermBytes(), - RemovalReason.EVICTED - ) - ); - // test the mapping, it should stay the same - assertTrue(cleanupKeyToCountMap.isEmpty()); - - IOUtils.close(secondReader); - } - // test adding to cleanupKeyToCountMap with multiple threads public void testAddToCleanupKeyToCountMap() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); cache = getIndicesRequestCache(settings); int numberOfThreads = 10; @@ -1123,10 +956,7 @@ public void testClosingIndexWipesStats() throws Exception { public void testCacheCleanupBasedOnStaleThreshold_thresholdUpdate() throws Exception { threadPool = getThreadPool(); - Settings settings = Settings.builder() - .put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%") - .put(FeatureFlags.PLUGGABLE_CACHE, true) - .build(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); From e89706a1b40318ca05359801021c29c61d493c68 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 12 Jun 2024 14:23:46 -0700 Subject: [PATCH 18/20] changelog to releaselog Signed-off-by: Kiran Prakash --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d77a4323984b..0066e077b16a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Removed ### Fixed -- Fix Concurrent Modification Exception in Indices Request Cache([#14032](https://github.com/opensearch-project/OpenSearch/pull/14221)) ### Security From 09defd51ea11f4035a4d856639a972bc5e7c98a6 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 12 Jun 2024 14:32:34 -0700 Subject: [PATCH 19/20] revert the test removal Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCacheTests.java | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 369a6ee73058f..fdbc7f05c5f6c 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -195,6 +195,58 @@ public void testBasicOperationsCache() throws Exception { assertEquals(0, cache.numRegisteredCloseListeners()); } + public void testBasicOperationsCacheWithFeatureFlag() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(); + cache = getIndicesRequestCache(settings); + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + + // initial cache + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); + Loader loader = new Loader(reader, 0); + BytesReference value = cache.getOrCompute(entity, loader, reader, getTermBytes()); + assertEquals("foo", value.streamInput().readString()); + ShardRequestCache requestCacheStats = indexShard.requestCache(); + assertEquals(0, requestCacheStats.stats().getHitCount()); + assertEquals(1, requestCacheStats.stats().getMissCount()); + assertEquals(0, requestCacheStats.stats().getEvictions()); + assertFalse(loader.loadedFromCache); + assertEquals(1, cache.count()); + + // cache hit + entity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(reader, 0); + value = cache.getOrCompute(entity, loader, reader, getTermBytes()); + assertEquals("foo", value.streamInput().readString()); + requestCacheStats = indexShard.requestCache(); + assertEquals(1, requestCacheStats.stats().getHitCount()); + assertEquals(1, requestCacheStats.stats().getMissCount()); + assertEquals(0, requestCacheStats.stats().getEvictions()); + assertTrue(loader.loadedFromCache); + assertEquals(1, cache.count()); + assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > value.length()); + assertEquals(1, cache.numRegisteredCloseListeners()); + + // Closing the cache doesn't modify an already returned CacheEntity + if (randomBoolean()) { + reader.close(); + } else { + indexShard.close("test", true, true); // closed shard but reader is still open + cache.clear(entity); + } + cache.cacheCleanupManager.cleanCache(); + assertEquals(1, requestCacheStats.stats().getHitCount()); + assertEquals(1, requestCacheStats.stats().getMissCount()); + assertEquals(0, requestCacheStats.stats().getEvictions()); + assertTrue(loader.loadedFromCache); + assertEquals(0, cache.count()); + assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); + + IOUtils.close(reader); + assertEquals(0, cache.numRegisteredCloseListeners()); + } + public void testCacheDifferentReaders() throws Exception { threadPool = getThreadPool(); cache = getIndicesRequestCache(Settings.EMPTY); From a3b76258fc5dcf2ceb966e158ed12cb75311a594 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 12 Jun 2024 14:48:53 -0700 Subject: [PATCH 20/20] revert the conflict resolutions Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCacheTests.java | 61 ++++++++++++++++--- 1 file changed, 52 insertions(+), 9 deletions(-) diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index fdbc7f05c5f6c..205712d388cd1 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -847,15 +847,9 @@ public void testAddToCleanupKeyToCountMap() throws Exception { private IndicesRequestCache getIndicesRequestCache(Settings settings) { IndicesService indicesService = getInstanceFromNode(IndicesService.class); - return new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), + return new IndicesRequestCache( + settings, + indicesService.indicesRequestCache.cacheEntityLookup, new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool, ClusterServiceUtils.createClusterService(threadPool) @@ -1472,6 +1466,55 @@ public void testDeleteAndCreateIndexShardOnSameNodeAndVerifyStats() throws Excep IOUtils.close(reader, writer, dir, cache); } + public void testIndexShardClosedAndVerifyCacheCleanUpWorksSuccessfully() throws Exception { + threadPool = getThreadPool(); + String indexName = "test1"; + // Create a shard + IndexService indexService = createIndex( + indexName, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + IndexShard indexShard = indexService.getShard(0); + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + writer.addDocument(newDoc(0, "foo")); + writer.addDocument(newDoc(1, "hack")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), indexShard.shardId()); + Loader loader = new Loader(reader, 0); + + // Set clean interval to a high value as we will do it manually here. + IndicesRequestCache cache = getIndicesRequestCache( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, TimeValue.timeValueMillis(100000)) + .build() + ); + IndicesService.IndexShardCacheEntity cacheEntity = new IndicesService.IndexShardCacheEntity(indexShard); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "bar"); + + // Cache some values for indexShard + BytesReference value = cache.getOrCompute(cacheEntity, loader, reader, getTermBytes()); + + // Verify response and stats. + assertEquals("foo", value.streamInput().readString()); + RequestCacheStats stats = indexShard.requestCache().stats(); + assertEquals("foo", value.streamInput().readString()); + assertEquals(1, cache.count()); + assertEquals(1, stats.getMissCount()); + assertTrue(stats.getMemorySizeInBytes() > 0); + + // Remove the shard making its cache entries stale + IOUtils.close(reader, writer, dir); + indexService.removeShard(0, "force"); + + assertBusy(() -> { assertEquals(IndexShardState.CLOSED, indexShard.state()); }, 1, TimeUnit.SECONDS); + + // Trigger clean up of cache. Should not throw any exception. + cache.cacheCleanupManager.cleanCache(); + // Verify all cleared up. + assertEquals(0, cache.count()); + IOUtils.close(cache); + } + public static String generateString(int length) { String characters = "abcdefghijklmnopqrstuvwxyz"; StringBuilder sb = new StringBuilder(length);