Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <[email protected]>
  • Loading branch information
Peter Alfonsi committed Oct 3, 2024
1 parent d471088 commit b774ae2
Showing 1 changed file with 0 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,6 @@ private synchronized void cleanCache(double stalenessThreshold) {
if (canSkipCacheCleanup(stalenessThreshold)) {
return;
}
ThreadPoolExecutor removalPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(NUM_CLEANUP_KEY_THREADS);
// Contains CleanupKey objects with open shard but invalidated readerCacheKeyId.
final Set<CleanupKey> cleanupKeysFromOutdatedReaders = new HashSet<>();
// Contains CleanupKey objects for a full cache cleanup.
Expand Down Expand Up @@ -789,20 +788,17 @@ private synchronized void cleanCache(double stalenessThreshold) {
Key delegatingKey = key.key;
Tuple<ShardId, Integer> shardIdInfo = new Tuple<>(delegatingKey.shardId, delegatingKey.indexShardHashCode);
if (cleanupKeysFromFullClean.contains(shardIdInfo) || cleanupKeysFromClosedShards.contains(shardIdInfo)) {
//iterator.remove();
removeKey(threadPool, phaser, key);
} else {
CacheEntity cacheEntity = cacheEntityLookup.apply(delegatingKey.shardId).orElse(null);
if (cacheEntity == null) {
// If cache entity is null, it means that index or shard got deleted/closed meanwhile.
// So we will delete this key.
dimensionListsToDrop.add(key.dimensions);
//iterator.remove();
removeKey(threadPool, phaser, key);
} else {
CleanupKey cleanupKey = new CleanupKey(cacheEntity, delegatingKey.readerCacheKeyId);
if (cleanupKeysFromOutdatedReaders.contains(cleanupKey)) {
//iterator.remove();
removeKey(threadPool, phaser, key);
}
}
Expand All @@ -819,12 +815,6 @@ private synchronized void cleanCache(double stalenessThreshold) {
removeBatch(threadPool, phaser);
}
phaser.arriveAndAwaitAdvance(); // Wait for all batches to signal completion
// How to await the pool being done?
// This is only TODO proof of concept
/*removalPool.shutdown();
try {
removalPool.awaitTermination(1, TimeUnit.MINUTES);
} catch (Exception ignored) {}*/

for (List<String> closedDimensions : dimensionListsToDrop) {
// Invalidate a dummy key containing the dimensions we need to drop stats for
Expand All @@ -833,9 +823,6 @@ private synchronized void cleanCache(double stalenessThreshold) {
cache.invalidate(dummyKey);
}
cache.refresh();
/*try {
Thread.sleep(1000); // TODO: Debug only for tests
} catch (Exception ignored) {}*/
}

private void removeKey(ThreadPool pool, Phaser phaser, ICacheKey<Key> key) {
Expand All @@ -862,20 +849,6 @@ private synchronized void removeBatch(ThreadPool pool, Phaser phaser) {
});
}

private void removeKey(ThreadPoolExecutor pool, ICacheKey<Key> key) {
// Shouldn't have to worry about other threads interfering with contents of batch. This should only be called by one thread at a time
// so long as cleanupCache isn't running over itself.
currentDeletionBatch.add(key);
if (currentDeletionBatch.size() >= KEYS_TO_DELETE_BATCH_SIZE) {
pool.execute(() -> {
for (ICacheKey<Key> batchKey : currentDeletionBatch) {
cache.invalidate(batchKey);
}
});
currentDeletionBatch = new HashSet<>();
}
}

/**
* Determines whether the cache cleanup process can be skipped based on the staleness threshold.
*
Expand Down

0 comments on commit b774ae2

Please sign in to comment.