Skip to content

Commit

Permalink
Fixed multiple disk tier instance issue, made spillover test pass
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Alfonsi committed Oct 17, 2023
1 parent d152478 commit 2d200fc
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,4 @@ public void dec(long n) {
public long count() {
return counter.sum();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,30 @@
package org.opensearch.indices;

import org.ehcache.PersistentCacheManager;
import org.ehcache.config.CacheRuntimeConfiguration;
import org.ehcache.config.builders.CacheConfigurationBuilder;
import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder;
import org.ehcache.config.builders.CacheManagerBuilder;
import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder;
import org.ehcache.config.builders.ResourcePoolsBuilder;
import org.ehcache.config.units.MemoryUnit;
import org.ehcache.core.internal.statistics.DefaultStatisticsService;
import org.ehcache.core.spi.service.StatisticsService;
import org.ehcache.core.statistics.TierStatistics;
import org.ehcache.event.CacheEvent;
import org.ehcache.event.CacheEventListener;
import org.ehcache.event.EventFiring;
import org.ehcache.event.EventOrdering;
import org.ehcache.event.EventType;
import org.ehcache.impl.config.executor.PooledExecutionServiceConfiguration;
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsAction;
import org.opensearch.common.ExponentiallyWeightedMovingAverage;
import org.opensearch.common.cache.RemovalListener;
import org.ehcache.Cache;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.EnumSet;

public class EhcacheDiskCachingTier implements DiskCachingTier<IndicesRequestCache.Key, BytesReference>, RemovalListener<IndicesRequestCache.Key, BytesReference> {
// & Writeable.Reader<K> ?

public static PersistentCacheManager cacheManager;
private Cache<EhcacheKey, BytesReference> cache;
Expand Down Expand Up @@ -124,7 +114,22 @@ private void getOrCreateCache(boolean isPersistent, long maxWeightInBytes) {
.withService(listenerConfig));
} catch (IllegalArgumentException e) {
// Thrown when the cache already exists, which may happen in test cases
// In this case the listener is configured to send messages to some other disk tier instance, which we don't want
// (it was set up unnecessarily by the test case)

// change config of existing cache to use this listener rather than the one instantiated by the test case
cache = cacheManager.getCache(cacheAlias, EhcacheKey.class, BytesReference.class);
// cache.getRuntimeConfiguration().cacheConfigurationListenerList contains the old listener, but it's private
// and theres no method to clear it unless you have the actual listener object, so it has to stay i think

cache.getRuntimeConfiguration().registerCacheEventListener(listener, EventOrdering.ORDERED, EventFiring.ASYNCHRONOUS,
EnumSet.of(
EventType.EVICTED,
EventType.EXPIRED,
EventType.REMOVED,
EventType.UPDATED,
EventType.CREATED));
int k = 1;
}
}

Expand Down Expand Up @@ -204,6 +209,7 @@ public Iterable<IndicesRequestCache.Key> keys() {

@Override
public int count() {
int j = 0;
return (int) count.count();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.opensearch.core.common.bytes.BytesReference;

public class EhcacheEventListener implements CacheEventListener<EhcacheKey, BytesReference> {
// Receives key-value pairs (BytesReference, BytesReference), but must transform into (Key, BytesReference)
// Receives key-value pairs (EhcacheKey, BytesReference), but must transform into (Key, BytesReference)
// to send removal notifications
private final RemovalListener<IndicesRequestCache.Key, BytesReference> removalListener;
private final EhcacheDiskCachingTier tier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.common.io.stream.Writeable;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ public long count() {
return totalCount;
}

public long count(TierType tierType) {
for (CachingTier<K, V> cachingTier : cachingTierList) {
if (cachingTier.getTierType() == tierType) {
return cachingTier.count();
}
}
return -1L;
}

@Override
public void onRemoval(RemovalNotification<K, V> notification) {
if (RemovalReason.EVICTED.equals(notification.getRemovalReason())) {
Expand Down Expand Up @@ -173,7 +182,7 @@ public Builder<K, V> setTieredCacheEventListener(TieredCacheEventListener<K, V>
public TieredCacheSpilloverStrategyHandler<K, V> build() {
return new TieredCacheSpilloverStrategyHandler<K, V>(
this.onHeapCachingTier,
this.diskCachingTier, // not sure why it was yelling about this, it already is an EhcacheDiskCachingTier
this.diskCachingTier,
this.tieredCacheEventListener
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,8 @@
import org.ehcache.event.EventType;
import org.ehcache.impl.config.executor.PooledExecutionServiceConfiguration;
import org.opensearch.common.CheckedSupplier;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.bytes.AbstractBytesReference;
Expand Down Expand Up @@ -164,128 +161,29 @@ public void testAddDirectToEhcache() throws Exception {
DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1));
AtomicBoolean indexShard = new AtomicBoolean(true);
TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
IndicesRequestCache.Key[] keys = new IndicesRequestCache.Key[9];
TermQueryBuilder termQuery = new TermQueryBuilder("id", "0");
BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false);
String rKey = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString();
IndicesRequestCache.Key key = cache.new Key(entity, termBytes, rKey);

//TestBytesReference value = new TestBytesReference(124);
BytesReference value = new BytesArray(new byte[]{0});
cache.tieredCacheHandler.getDiskCachingTier().put(key, value);

System.out.println("Size: " + cache.tieredCacheHandler.getDiskCachingTier().count());
BytesReference res = cache.tieredCacheHandler.getDiskCachingTier().get(key);
assertEquals(value, res);
assertEquals(1, cache.tieredCacheHandler.count(TierType.DISK));

IOUtils.close(reader, writer, dir, cache);
cache.closeDiskTier();
}

/*public void testSimpleEhcache() throws Exception {
// for debug only, delete
CounterMetric count = new CounterMetric();
String cacheAlias = "dummy";
class DummyRemovalListener implements RemovalListener<Integer, String> {
public DummyRemovalListener() { }
@Override
public void onRemoval(RemovalNotification<Integer, String> notification) {
System.out.println(":)");
}
}
CacheEventListenerConfigurationBuilder listenerConfig = CacheEventListenerConfigurationBuilder
.newEventListenerConfiguration(new EhcacheEventListener<Integer, String>(new DummyRemovalListener(), count),
EventType.EVICTED,
EventType.EXPIRED,
EventType.REMOVED,
EventType.UPDATED,
EventType.CREATED)
.ordered().asynchronous(); // ordered() has some performance penalty as compared to unordered(), we can also use synchronous()
StatisticsService statsService = new DefaultStatisticsService();
PooledExecutionServiceConfiguration threadConfig = PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder()
.defaultPool("default", 0, 4)
.build();
PersistentCacheManager cacheManager;
boolean doIntCache = false;
if (doIntCache) {
cacheManager = CacheManagerBuilder.newCacheManagerBuilder()
.using(statsService) // https://stackoverflow.com/questions/40453859/how-to-get-ehcache-3-1-statistics
.using(threadConfig)
.with(CacheManagerBuilder.persistence(EhcacheDiskCachingTier.DISK_CACHE_FP))
.withCache(cacheAlias, CacheConfigurationBuilder.newCacheConfigurationBuilder(
Integer.class, String.class, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(10, MemoryUnit.MB, false))
.withService(listenerConfig)
).build(true);
Cache<Integer, String> integerCache = cacheManager.getCache(cacheAlias, Integer.class, String.class);
integerCache.put(0, "blorp");
System.out.println("Counter value = " + count.count());
String res = integerCache.get(0);
System.out.println("Got result " + res);
System.out.println("Counter value = " + count.count());
} else {
cacheManager = CacheManagerBuilder.newCacheManagerBuilder()
.using(statsService) // https://stackoverflow.com/questions/40453859/how-to-get-ehcache-3-1-statistics
.using(threadConfig)
.with(CacheManagerBuilder.persistence(EhcacheDiskCachingTier.DISK_CACHE_FP))
.withCache(cacheAlias, CacheConfigurationBuilder.newCacheConfigurationBuilder(
DummySerializableKey.class, String.class, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(10, MemoryUnit.MB, false))
.withService(listenerConfig)
).build(true);
Cache<DummySerializableKey, String> cache = cacheManager.getCache(cacheAlias, DummySerializableKey.class, String.class);
DummySerializableKey key = new DummySerializableKey(Integer.valueOf(0), "blah");
cache.put(key, "blorp");
System.out.println("Counter value = " + count.count());
String res = cache.get(key);
System.out.println("Got result " + res);
System.out.println("Counter value = " + count.count());
TierStatistics ts = statsService.getCacheStatistics(cacheAlias).getTierStatistics().get("Disk");
System.out.println("self-reported count = " + ts.getMappings());
System.out.println("self-reported misses = " + ts.getMisses());
System.out.println("self-reported hits = " + ts.getHits());
List<Cache.Entry<DummySerializableKey, String>> foos = new ArrayList<>();
for(Cache.Entry<DummySerializableKey, String> entry : cache) {
foos.add(entry);
}
int j = 0;
j++;
System.out.println(j);
}
/*Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
writer.addDocument(newDoc(0, "foo"));
DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1));
AtomicBoolean indexShard = new AtomicBoolean(true);
ShardRequestCache requestCacheStats = new ShardRequestCache();
TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
System.out.println("On-heap cache size at start = " + requestCacheStats.stats().getMemorySizeInBytes());
IndicesRequestCache.Key[] keys = new IndicesRequestCache.Key[9];
TermQueryBuilder termQuery = new TermQueryBuilder("id", "0");
BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false);
IndicesRequestCache.Key key = new IndicesRequestCache.Key(entity, reader.getReaderCacheHelper().getKey(), termBytes);*/


/*cacheManager.removeCache(cacheAlias);
cacheManager.close();
//IOUtils.close(reader, writer, dir);
}*/

public void testSpillover() throws Exception {
// fill the on-heap cache until we spill over
ShardRequestCache requestCacheStats = new ShardRequestCache();
Settings.Builder settingsBuilder = Settings.builder();
long heapSizeBytes = 1000; // each of these queries is 115 bytes, so we can fit 8 in the heap cache
long heapSizeBytes = 1000; // each of these queries is 131 bytes, so we can fit 7 in the heap cache
int heapKeySize = 131;
int maxNumInHeap = 1000 / heapKeySize;
settingsBuilder.put("indices.requests.cache.size", new ByteSizeValue(heapSizeBytes));
IndicesRequestCache cache = new IndicesRequestCache(settingsBuilder.build(), getInstanceFromNode(IndicesService.class));

Expand All @@ -298,8 +196,8 @@ public void testSpillover() throws Exception {
TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
System.out.println("On-heap cache size at start = " + requestCacheStats.stats().getMemorySizeInBytes());
IndicesRequestCache.Key[] keys = new IndicesRequestCache.Key[9];
for (int i = 0; i < 9; i++) {
IndicesRequestCache.Key[] keys = new IndicesRequestCache.Key[maxNumInHeap + 1];
for (int i = 0; i < maxNumInHeap + 1; i++) {
TermQueryBuilder termQuery = new TermQueryBuilder("id", String.valueOf(i));
BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false);
String rKey = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString();
Expand All @@ -310,15 +208,18 @@ public void testSpillover() throws Exception {
}
// attempt to get value from disk cache, the first key should have been evicted
BytesReference firstValue = cache.tieredCacheHandler.get(keys[0]);
System.out.println("Final on-heap cache size = " + requestCacheStats.stats().getMemorySizeInBytes()); // is correctly 920
//System.out.println("Final self-reported disk size = " + cache.tieredCacheHandler.getDiskWeightBytes()); // is 0, should be 115
System.out.println("On-heap tier evictions = " + requestCacheStats.stats().getEvictions()); // is correctly 1
System.out.println("Disk tier hits = " + requestCacheStats.stats(TierType.DISK).getHitCount()); // should be 1, is 0 bc keys not serializable
System.out.println("Disk tier misses = " + requestCacheStats.stats(TierType.DISK).getMissCount()); // should be 9, is 10 bc keys not serializable
//System.out.println("Disk tier self-reported misses = " + cache.tieredCacheHandler.getDiskCachingTier().getMisses()); // should be same as other one
System.out.println("On-heap tier hits = " + requestCacheStats.stats().getHitCount()); // is correctly 0
System.out.println("On-heap tier misses = " + requestCacheStats.stats().getMissCount()); // is correctly 10
System.out.println("Disk count = " + cache.tieredCacheHandler.getDiskCachingTier().count()); // should be 1, is 0

assertEquals(maxNumInHeap * heapKeySize, requestCacheStats.stats().getMemorySizeInBytes());
// TODO: disk weight bytes
assertEquals(1, requestCacheStats.stats().getEvictions());
assertEquals(1, requestCacheStats.stats(TierType.DISK).getHitCount());
assertEquals(maxNumInHeap + 1, requestCacheStats.stats(TierType.DISK).getMissCount());
assertEquals(0, requestCacheStats.stats().getHitCount());
assertEquals(maxNumInHeap + 2, requestCacheStats.stats().getMissCount());
assertEquals(maxNumInHeap, cache.tieredCacheHandler.count(TierType.ON_HEAP));
assertEquals(1, cache.tieredCacheHandler.count(TierType.DISK));

// more?
IOUtils.close(reader, writer, dir, cache);
cache.closeDiskTier();
}
Expand Down

0 comments on commit 2d200fc

Please sign in to comment.