Skip to content

Commit

Permalink
[Tiered Cache] Use ConcurrentHashMap explicitly in IndicesRequestCache (
Browse files Browse the repository at this point in the history
#14409)

Signed-off-by: Kiran Prakash <[email protected]>
  • Loading branch information
kiranprakash154 authored Jun 18, 2024
1 parent 903784b commit 823ce68
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ public void testCacheCleanupAfterIndexDeletion() throws Exception {
}, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS);
}

// when staleness threshold is lower than staleness, it should clean the cache from all indices having stale keys
// when staleness threshold is lower than staleness, it should clean cache from all indices having stale keys
public void testStaleKeysCleanupWithMultipleIndices() throws Exception {
int cacheCleanIntervalInMillis = 10;
String node = internalCluster().startNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -506,15 +507,15 @@ public int hashCode() {
* */
class IndicesRequestCacheCleanupManager implements Closeable {
private final Set<CleanupKey> keysToClean;
private final ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap;
private final ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap;
private final AtomicInteger staleKeysCount;
private volatile double stalenessThreshold;
private final IndicesRequestCacheCleaner cacheCleaner;

IndicesRequestCacheCleanupManager(ThreadPool threadpool, TimeValue cleanInterval, double stalenessThreshold) {
this.stalenessThreshold = stalenessThreshold;
this.keysToClean = ConcurrentCollections.newConcurrentSet();
this.cleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap();
this.cleanupKeyToCountMap = new ConcurrentHashMap<>();
this.staleKeysCount = new AtomicInteger(0);
this.cacheCleaner = new IndicesRequestCacheCleaner(this, threadpool, cleanInterval);
threadpool.schedule(cacheCleaner, cleanInterval, ThreadPool.Names.SAME);
Expand Down Expand Up @@ -572,8 +573,7 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {

// pkg-private for testing
void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) {
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap())
.merge(readerCacheKeyId, 1, Integer::sum);
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new ConcurrentHashMap<>()).merge(readerCacheKeyId, 1, Integer::sum);
}

/**
Expand Down Expand Up @@ -831,7 +831,7 @@ public void close() {
}

// for testing
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> getCleanupKeyToCountMap() {
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> getCleanupKeyToCountMap() {
return cleanupKeyToCountMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -491,7 +490,8 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount(
indexShard.hashCode()
);
// test the mapping
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
Expand Down Expand Up @@ -554,7 +554,8 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS
);

// test the mapping
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
Expand Down Expand Up @@ -722,7 +723,8 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes());
assertEquals(1, cache.count());
// test the mappings
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
.getCleanupKeyToCountMap();
assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader)));

cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes());
Expand Down Expand Up @@ -796,15 +798,15 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
}

// test adding to cleanupKeyToCountMap with multiple threads
public void testAddToCleanupKeyToCountMap() throws Exception {
public void testAddingToCleanupKeyToCountMapWorksAppropriatelyWithMultipleThreads() throws Exception {
threadPool = getThreadPool();
Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build();
cache = getIndicesRequestCache(settings);

int numberOfThreads = 10;
int numberOfIterations = 1000;
Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread
AtomicBoolean exceptionDetected = new AtomicBoolean(false);
AtomicBoolean concurrentModificationExceptionDetected = new AtomicBoolean(false);

ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

Expand All @@ -817,7 +819,7 @@ public void testAddToCleanupKeyToCountMap() throws Exception {
}
} catch (ConcurrentModificationException e) {
logger.error("ConcurrentModificationException detected in thread : " + e.getMessage());
exceptionDetected.set(true); // Set flag if exception is detected
concurrentModificationExceptionDetected.set(true); // Set flag if exception is detected
}
});
}
Expand All @@ -836,13 +838,17 @@ public void testAddToCleanupKeyToCountMap() throws Exception {
}
} catch (ConcurrentModificationException e) {
logger.error("ConcurrentModificationException detected in main thread : " + e.getMessage());
exceptionDetected.set(true); // Set flag if exception is detected
concurrentModificationExceptionDetected.set(true); // Set flag if exception is detected
}
});

executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
assertFalse(exceptionDetected.get());
assertTrue(executorService.awaitTermination(60, TimeUnit.SECONDS));
assertEquals(
numberOfThreads * numberOfIterations,
cache.cacheCleanupManager.getCleanupKeyToCountMap().get(indexShard.shardId()).size()
);
assertFalse(concurrentModificationExceptionDetected.get());
}

private IndicesRequestCache getIndicesRequestCache(Settings settings) {
Expand Down

0 comments on commit 823ce68

Please sign in to comment.