diff --git a/CHANGELOG.md b/CHANGELOG.md index a6d2488caa010..3e7229a2a0336 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -104,6 +104,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add toString methods to MultiSearchRequest, MultiGetRequest and CreateIndexRequest ([#12163](https://github.com/opensearch-project/OpenSearch/pull/12163)) - Support for returning scores in matched queries ([#11626](https://github.com/opensearch-project/OpenSearch/pull/11626)) - Add shard id property to SearchLookup for use in field types provided by plugins ([#1063](https://github.com/opensearch-project/OpenSearch/pull/1063)) +- [Tiered caching] Make IndicesRequestCache implementation configurable [EXPERIMENTAL] ([#12533](https://github.com/opensearch-project/OpenSearch/pull/12533)) - Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835)) ### Dependencies diff --git a/distribution/src/config/opensearch.yml b/distribution/src/config/opensearch.yml index ebffdde0f3699..10bab9b3fce92 100644 --- a/distribution/src/config/opensearch.yml +++ b/distribution/src/config/opensearch.yml @@ -121,3 +121,7 @@ ${path.logs} # Once there is no observed impact on performance, this feature flag can be removed. # #opensearch.experimental.optimization.datetime_formatter_caching.enabled: false +# +# Gates the functionality of enabling Opensearch to use pluggable caches with respective store names via setting. +# +#opensearch.experimental.feature.pluggable.caching.enabled: false diff --git a/modules/cache-common/build.gradle b/modules/cache-common/build.gradle index c7052896e609b..98cdec83b9ad1 100644 --- a/modules/cache-common/build.gradle +++ b/modules/cache-common/build.gradle @@ -6,6 +6,8 @@ * compatible open source license. */ +apply plugin: 'opensearch.internal-cluster-test' + opensearchplugin { description 'Module for caches which are optional and do not require additional security permission' classname 'org.opensearch.cache.common.tier.TieredSpilloverCachePlugin' diff --git a/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java b/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java new file mode 100644 index 0000000000000..568ac4d188c51 --- /dev/null +++ b/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java @@ -0,0 +1,150 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cache.common.tier; + +import org.opensearch.action.admin.cluster.node.info.NodeInfo; +import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.SearchType; +import org.opensearch.client.Client; +import org.opensearch.common.cache.CacheType; +import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.settings.CacheSettings; +import org.opensearch.common.cache.store.OpenSearchOnHeapCache; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.IndicesRequestCache; +import org.opensearch.plugins.CachePlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.PluginInfo; +import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Assert; + +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.greaterThan; + +public class TieredSpilloverCacheIT extends OpenSearchIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(TieredSpilloverCachePlugin.class, MockDiskCachePlugin.class); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) + .put( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace( + CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() + ).getKey(), + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME + ) + .put( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace( + CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() + ).getKey(), + MockDiskCache.MockDiskCacheFactory.NAME + ) + .build(); + } + + public void testPluginsAreInstalled() { + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()); + NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet(); + List pluginInfos = nodesInfoResponse.getNodes() + .stream() + .flatMap( + (Function>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream() + ) + .collect(Collectors.toList()); + Assert.assertTrue( + pluginInfos.stream() + .anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.cache.common" + ".tier.TieredSpilloverCachePlugin")) + ); + } + + public void testSanityChecksWithIndicesRequestCache() throws InterruptedException { + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("f", "type=date") + .setSettings(Settings.builder().put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true).build()) + .get() + ); + indexRandom( + true, + client.prepareIndex("index").setSource("f", "2014-03-10T00:00:00.000Z"), + client.prepareIndex("index").setSource("f", "2014-05-13T00:00:00.000Z") + ); + ensureSearchable("index"); + + // This is not a random example: serialization with time zones writes shared strings + // which used to not work well with the query cache because of the handles stream output + // see #9500 + final SearchResponse r1 = client.prepareSearch("index") + .setSize(0) + .setSearchType(SearchType.QUERY_THEN_FETCH) + .addAggregation( + dateHistogram("histo").field("f") + .timeZone(ZoneId.of("+01:00")) + .minDocCount(0) + .dateHistogramInterval(DateHistogramInterval.MONTH) + ) + .get(); + assertSearchResponse(r1); + + // The cached is actually used + assertThat( + client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), + greaterThan(0L) + ); + } + + public static class MockDiskCachePlugin extends Plugin implements CachePlugin { + + public MockDiskCachePlugin() {} + + @Override + public Map getCacheFactoryMap() { + return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 1000)); + } + + @Override + public String getName() { + return "mock_disk_plugin"; + } + } +} diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java new file mode 100644 index 0000000000000..79b57b80c3aa0 --- /dev/null +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cache.common.tier; + +import org.opensearch.common.cache.CacheType; +import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.LoadAwareCacheLoader; +import org.opensearch.common.cache.store.builders.ICacheBuilder; +import org.opensearch.common.cache.store.config.CacheConfig; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class MockDiskCache implements ICache { + + Map cache; + int maxSize; + long delay; + + public MockDiskCache(int maxSize, long delay) { + this.maxSize = maxSize; + this.delay = delay; + this.cache = new ConcurrentHashMap(); + } + + @Override + public V get(K key) { + V value = cache.get(key); + return value; + } + + @Override + public void put(K key, V value) { + if (this.cache.size() >= maxSize) { // For simplification + return; + } + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + this.cache.put(key, value); + } + + @Override + public V computeIfAbsent(K key, LoadAwareCacheLoader loader) { + V value = cache.computeIfAbsent(key, key1 -> { + try { + return loader.load(key); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + return value; + } + + @Override + public void invalidate(K key) { + this.cache.remove(key); + } + + @Override + public void invalidateAll() { + this.cache.clear(); + } + + @Override + public Iterable keys() { + return this.cache.keySet(); + } + + @Override + public long count() { + return this.cache.size(); + } + + @Override + public void refresh() {} + + @Override + public void close() { + + } + + public static class MockDiskCacheFactory implements Factory { + + public static final String NAME = "mockDiskCache"; + final long delay; + final int maxSize; + + public MockDiskCacheFactory(long delay, int maxSize) { + this.delay = delay; + this.maxSize = maxSize; + } + + @Override + public ICache create(CacheConfig config, CacheType cacheType, Map cacheFactories) { + return new Builder().setMaxSize(maxSize).setDeliberateDelay(delay).build(); + } + + @Override + public String getCacheName() { + return NAME; + } + } + + public static class Builder extends ICacheBuilder { + + int maxSize; + long delay; + + @Override + public ICache build() { + return new MockDiskCache(this.maxSize, this.delay); + } + + public Builder setMaxSize(int maxSize) { + this.maxSize = maxSize; + return this; + } + + public Builder setDeliberateDelay(long millis) { + this.delay = millis; + return this; + } + } +} diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 7c9569f5defe2..2f7938934300e 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -13,19 +13,19 @@ import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.settings.CacheSettings; import org.opensearch.common.cache.store.OpenSearchOnHeapCache; -import org.opensearch.common.cache.store.builders.ICacheBuilder; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings; import org.opensearch.common.metrics.CounterMetric; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.test.OpenSearchTestCase; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Phaser; @@ -105,7 +105,7 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace( CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() ).getKey(), - MockOnDiskCache.MockDiskCacheFactory.NAME + MockDiskCache.MockDiskCacheFactory.NAME ) .put( OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) @@ -126,8 +126,8 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception Map.of( OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(), - MockOnDiskCache.MockDiskCacheFactory.NAME, - new MockOnDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) + MockDiskCache.MockDiskCacheFactory.NAME, + new MockDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) ) ); @@ -164,7 +164,7 @@ public void testWithFactoryCreationWithOnHeapCacheNotPresent() { TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace( CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() ).getKey(), - MockOnDiskCache.MockDiskCacheFactory.NAME + MockDiskCache.MockDiskCacheFactory.NAME ) .put( OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) @@ -187,8 +187,8 @@ public void testWithFactoryCreationWithOnHeapCacheNotPresent() { Map.of( OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(), - MockOnDiskCache.MockDiskCacheFactory.NAME, - new MockOnDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) + MockDiskCache.MockDiskCacheFactory.NAME, + new MockDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) ) ) ); @@ -232,8 +232,8 @@ public void testWithFactoryCreationWithDiskCacheNotPresent() { Map.of( OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(), - MockOnDiskCache.MockDiskCacheFactory.NAME, - new MockOnDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) + MockDiskCache.MockDiskCacheFactory.NAME, + new MockDiskCache.MockDiskCacheFactory(0, randomIntBetween(100, 300)) ) ) ); @@ -256,6 +256,11 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception { .setRemovalListener(removalListener) .setSettings( Settings.builder() + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) + .put(FeatureFlags.PLUGGABLE_CACHE, "true") .put( OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) .get(MAXIMUM_SIZE_IN_BYTES_KEY) @@ -266,7 +271,7 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception { ) .build(); - ICache.Factory mockDiskCacheFactory = new MockOnDiskCache.MockDiskCacheFactory(0, diskCacheSize); + ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(0, diskCacheSize); TieredSpilloverCache tieredSpilloverCache = new TieredSpilloverCache.Builder() .setOnHeapCacheFactory(onHeapCacheFactory) @@ -444,6 +449,10 @@ public void testPutAndVerifyNewItemsArePresentOnHeapCache() throws Exception { diskCacheSize, removalListener, Settings.builder() + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) .put( OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) .get(MAXIMUM_SIZE_IN_BYTES_KEY) @@ -741,13 +750,18 @@ public void testConcurrencyForEvictionFlow() throws Exception { MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); ICache.Factory onHeapCacheFactory = new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(); - ICache.Factory diskCacheFactory = new MockOnDiskCache.MockDiskCacheFactory(500, diskCacheSize); + ICache.Factory diskCacheFactory = new MockDiskCache.MockDiskCacheFactory(500, diskCacheSize); CacheConfig cacheConfig = new CacheConfig.Builder().setKeyType(String.class) .setKeyType(String.class) .setWeigher((k, v) -> 150) .setRemovalListener(removalListener) .setSettings( Settings.builder() + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) + .put(FeatureFlags.PLUGGABLE_CACHE, "true") .put( OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) .get(MAXIMUM_SIZE_IN_BYTES_KEY) @@ -858,10 +872,19 @@ private TieredSpilloverCache intializeTieredSpilloverCache( .setKeyType(String.class) .setWeigher((k, v) -> keyValueSize) .setRemovalListener(removalListener) - .setSettings(settings) + .setSettings( + Settings.builder() + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) + .put(FeatureFlags.PLUGGABLE_CACHE, "true") + .put(settings) + .build() + ) .build(); - ICache.Factory mockDiskCacheFactory = new MockOnDiskCache.MockDiskCacheFactory(diskDeliberateDelay, diskCacheSize); + ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(diskDeliberateDelay, diskCacheSize); return new TieredSpilloverCache.Builder().setCacheType(CacheType.INDICES_REQUEST_CACHE) .setRemovalListener(removalListener) @@ -871,118 +894,3 @@ private TieredSpilloverCache intializeTieredSpilloverCache( .build(); } } - -class MockOnDiskCache implements ICache { - - Map cache; - int maxSize; - long delay; - - MockOnDiskCache(int maxSize, long delay) { - this.maxSize = maxSize; - this.delay = delay; - this.cache = new ConcurrentHashMap(); - } - - @Override - public V get(K key) { - V value = cache.get(key); - return value; - } - - @Override - public void put(K key, V value) { - if (this.cache.size() >= maxSize) { // For simplification - return; - } - try { - Thread.sleep(delay); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - this.cache.put(key, value); - } - - @Override - public V computeIfAbsent(K key, LoadAwareCacheLoader loader) { - V value = cache.computeIfAbsent(key, key1 -> { - try { - return loader.load(key); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - return value; - } - - @Override - public void invalidate(K key) { - this.cache.remove(key); - } - - @Override - public void invalidateAll() { - this.cache.clear(); - } - - @Override - public Iterable keys() { - return this.cache.keySet(); - } - - @Override - public long count() { - return this.cache.size(); - } - - @Override - public void refresh() {} - - @Override - public void close() { - - } - - public static class MockDiskCacheFactory implements Factory { - - static final String NAME = "mockDiskCache"; - final long delay; - final int maxSize; - - MockDiskCacheFactory(long delay, int maxSize) { - this.delay = delay; - this.maxSize = maxSize; - } - - @Override - public ICache create(CacheConfig config, CacheType cacheType, Map cacheFactories) { - return new Builder().setMaxSize(maxSize).setDeliberateDelay(delay).build(); - } - - @Override - public String getCacheName() { - return NAME; - } - } - - public static class Builder extends ICacheBuilder { - - int maxSize; - long delay; - - @Override - public ICache build() { - return new MockOnDiskCache(this.maxSize, this.delay); - } - - public Builder setMaxSize(int maxSize) { - this.maxSize = maxSize; - return this; - } - - public Builder setDeliberateDelay(long millis) { - this.delay = millis; - return this; - } - } -} diff --git a/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java b/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java new file mode 100644 index 0000000000000..c68455463ee3d --- /dev/null +++ b/plugins/cache-ehcache/src/internalClusterTest/java/org/opensearch/cache/EhcacheDiskCacheIT.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cache; + +import org.opensearch.action.admin.cluster.node.info.NodeInfo; +import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.PluginInfo; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Assert; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class EhcacheDiskCacheIT extends OpenSearchIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(EhcacheCachePlugin.class); + } + + public void testPluginsAreInstalled() { + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()); + NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet(); + List pluginInfos = nodesInfoResponse.getNodes() + .stream() + .flatMap( + (Function>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream() + ) + .collect(Collectors.toList()); + Assert.assertTrue( + pluginInfos.stream().anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.cache.EhcacheCachePlugin")) + ); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 82577eb1501f3..52b4dad553180 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.time.DateFormatter; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.cache.request.RequestCacheStats; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder; @@ -77,7 +78,9 @@ public IndicesRequestCacheIT(Settings settings) { public static Collection parameters() { return Arrays.asList( new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, - new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() } + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }, + new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "true").build() }, + new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "false").build() } ); } diff --git a/server/src/main/java/org/opensearch/common/cache/service/CacheService.java b/server/src/main/java/org/opensearch/common/cache/service/CacheService.java index c6e970b58ea08..b6710e5e4b424 100644 --- a/server/src/main/java/org/opensearch/common/cache/service/CacheService.java +++ b/server/src/main/java/org/opensearch/common/cache/service/CacheService.java @@ -8,12 +8,15 @@ package org.opensearch.common.cache.service; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; import org.opensearch.common.cache.settings.CacheSettings; +import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import java.util.HashMap; import java.util.Map; @@ -21,6 +24,7 @@ /** * Service responsible to create caches. */ +@ExperimentalApi public class CacheService { private final Map cacheStoreTypeFactories; @@ -42,8 +46,13 @@ public ICache createCache(CacheConfig config, CacheType cache cacheType.getSettingPrefix() ); String storeName = cacheSettingForCacheType.get(settings); - if (storeName == null || storeName.isBlank()) { - throw new IllegalArgumentException("No configuration exists for cache type: " + cacheType); + if (!FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings) || (storeName == null || storeName.isBlank())) { + // Condition 1: In case feature flag is off, we default to onHeap. + // Condition 2: In case storeName is not explicitly mentioned, we assume user is looking to use older + // settings, so we again fallback to onHeap to maintain backward compatibility. + // It is guaranteed that we will have this store name registered, so + // should be safe. + storeName = OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME; } if (!cacheStoreTypeFactories.containsKey(storeName)) { throw new IllegalArgumentException("No store name: [" + storeName + "] is registered for cache type: " + cacheType); diff --git a/server/src/main/java/org/opensearch/common/cache/settings/CacheSettings.java b/server/src/main/java/org/opensearch/common/cache/settings/CacheSettings.java index eb4563fda2275..43a047f0f22c6 100644 --- a/server/src/main/java/org/opensearch/common/cache/settings/CacheSettings.java +++ b/server/src/main/java/org/opensearch/common/cache/settings/CacheSettings.java @@ -28,7 +28,7 @@ public class CacheSettings { (key) -> Setting.simpleString(key, "", Setting.Property.NodeScope) ); - public static Setting getConcreteSettingForCacheType(CacheType cacheType) { + public static Setting getConcreteStoreNameSettingForCacheType(CacheType cacheType) { return CACHE_TYPE_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix()); } } diff --git a/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java b/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java index d218903de5b6d..c9bec4ba47def 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java +++ b/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java @@ -15,15 +15,19 @@ import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.settings.CacheSettings; import org.opensearch.common.cache.store.builders.ICacheBuilder; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.unit.ByteSizeValue; import java.util.Map; +import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.EXPIRE_AFTER_ACCESS_KEY; import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY; /** @@ -111,9 +115,22 @@ public static class OpenSearchOnHeapCacheFactory implements Factory { public ICache create(CacheConfig config, CacheType cacheType, Map cacheFactories) { Map> settingList = OpenSearchOnHeapCacheSettings.getSettingListForCacheType(cacheType); Settings settings = config.getSettings(); - return new Builder().setMaximumWeightInBytes( + ICacheBuilder builder = new Builder().setMaximumWeightInBytes( ((ByteSizeValue) settingList.get(MAXIMUM_SIZE_IN_BYTES_KEY).get(settings)).getBytes() - ).setWeigher(config.getWeigher()).setRemovalListener(config.getRemovalListener()).build(); + ) + .setExpireAfterAccess(((TimeValue) settingList.get(EXPIRE_AFTER_ACCESS_KEY).get(settings))) + .setWeigher(config.getWeigher()) + .setRemovalListener(config.getRemovalListener()); + Setting cacheSettingForCacheType = CacheSettings.CACHE_TYPE_STORE_NAME.getConcreteSettingForNamespace( + cacheType.getSettingPrefix() + ); + String storeName = cacheSettingForCacheType.get(settings); + if (!FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings) || (storeName == null || storeName.isBlank())) { + // For backward compatibility as the user intent is to use older settings. + builder.setMaximumWeightInBytes(config.getMaxSizeInBytes()); + builder.setExpireAfterAccess(config.getExpireAfterAccess()); + } + return builder.build(); } @Override diff --git a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java index 6fefea6578fb9..fa82e9be72e6e 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java +++ b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java @@ -11,6 +11,7 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import java.util.function.ToLongBiFunction; @@ -41,12 +42,24 @@ public class CacheConfig { private final RemovalListener removalListener; + /** + * Max size in bytes for the cache. This is needed for backward compatibility. + */ + private final long maxSizeInBytes; + + /** + * Defines the expiration time for a cache entry. This is needed for backward compatibility. + */ + private final TimeValue expireAfterAccess; + private CacheConfig(Builder builder) { this.keyType = builder.keyType; this.valueType = builder.valueType; this.settings = builder.settings; this.removalListener = builder.removalListener; this.weigher = builder.weigher; + this.maxSizeInBytes = builder.maxSizeInBytes; + this.expireAfterAccess = builder.expireAfterAccess; } public Class getKeyType() { @@ -69,6 +82,14 @@ public ToLongBiFunction getWeigher() { return weigher; } + public Long getMaxSizeInBytes() { + return maxSizeInBytes; + } + + public TimeValue getExpireAfterAccess() { + return expireAfterAccess; + } + /** * Builder class to build Cache config related parameters. * @param Type of key. @@ -86,6 +107,10 @@ public static class Builder { private ToLongBiFunction weigher; + private long maxSizeInBytes; + + private TimeValue expireAfterAccess; + public Builder() {} public Builder setSettings(Settings settings) { @@ -113,6 +138,16 @@ public Builder setWeigher(ToLongBiFunction weigher) { return this; } + public Builder setMaxSizeInBytes(long sizeInBytes) { + this.maxSizeInBytes = sizeInBytes; + return this; + } + + public Builder setExpireAfterAccess(TimeValue expireAfterAccess) { + this.expireAfterAccess = expireAfterAccess; + return this; + } + public CacheConfig build() { return new CacheConfig<>(this); } diff --git a/server/src/main/java/org/opensearch/common/cache/store/settings/OpenSearchOnHeapCacheSettings.java b/server/src/main/java/org/opensearch/common/cache/store/settings/OpenSearchOnHeapCacheSettings.java index bfd2d937fb430..5a2964ad011bf 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/settings/OpenSearchOnHeapCacheSettings.java +++ b/server/src/main/java/org/opensearch/common/cache/store/settings/OpenSearchOnHeapCacheSettings.java @@ -11,6 +11,7 @@ import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.unit.ByteSizeValue; import java.util.HashMap; @@ -33,9 +34,25 @@ public class OpenSearchOnHeapCacheSettings { (key) -> Setting.memorySizeSetting(key, "1%", NodeScope) ); + /** + * Setting to define expire after access. + * + * Setting pattern: {cache_type}.opensearch_onheap.expire + */ + public static final Setting.AffixSetting EXPIRE_AFTER_ACCESS_SETTING = Setting.suffixKeySetting( + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME + ".expire", + (key) -> Setting.positiveTimeSetting(key, TimeValue.MAX_VALUE, Setting.Property.NodeScope) + ); + public static final String MAXIMUM_SIZE_IN_BYTES_KEY = "maximum_size_in_bytes"; + public static final String EXPIRE_AFTER_ACCESS_KEY = "expire_after_access"; - private static final Map> KEY_SETTING_MAP = Map.of(MAXIMUM_SIZE_IN_BYTES_KEY, MAXIMUM_SIZE_IN_BYTES); + private static final Map> KEY_SETTING_MAP = Map.of( + MAXIMUM_SIZE_IN_BYTES_KEY, + MAXIMUM_SIZE_IN_BYTES, + EXPIRE_AFTER_ACCESS_KEY, + EXPIRE_AFTER_ACCESS_SETTING + ); public static final Map>> CACHE_TYPE_MAP = getCacheTypeMap(); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 8695ff878c8bc..5090010198a5d 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -81,6 +81,8 @@ import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.cache.CacheType; +import org.opensearch.common.cache.settings.CacheSettings; import org.opensearch.common.logging.Loggers; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.network.NetworkService; @@ -730,6 +732,8 @@ public void apply(Settings value, Settings current, Settings previous) { TelemetrySettings.METRICS_PUBLISH_INTERVAL_SETTING, TelemetrySettings.TRACER_FEATURE_ENABLED_SETTING, TelemetrySettings.METRICS_FEATURE_ENABLED_SETTING - ) + ), + List.of(FeatureFlags.PLUGGABLE_CACHE), + List.of(CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE)) ); } diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java index 47da53b52c325..4cf7f22c014dd 100644 --- a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -36,6 +36,7 @@ protected FeatureFlagSettings( FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING, FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING, FeatureFlags.DOC_ID_FUZZY_SET_SETTING, - FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING + FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING, + FeatureFlags.PLUGGABLE_CACHE_SETTING ); } diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index b51efeab21254..9e202a5bfd143 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -64,6 +64,12 @@ public class FeatureFlags { */ public static final String DOC_ID_FUZZY_SET = "opensearch.experimental.optimize_doc_id_lookup.fuzzy_set.enabled"; + /** + * Gates the functionality of pluggable cache. + * Enables OpenSearch to use pluggable caches with respective store names via setting. + */ + public static final String PLUGGABLE_CACHE = "opensearch.experimental.feature.pluggable.caching.enabled"; + /** * Should store the settings from opensearch.yml. */ @@ -128,4 +134,6 @@ public static boolean isEnabled(Setting featureFlag) { ); public static final Setting DOC_ID_FUZZY_SET_SETTING = Setting.boolSetting(DOC_ID_FUZZY_SET, false, Property.NodeScope); + + public static final Setting PLUGGABLE_CACHE_SETTING = Setting.boolSetting(PLUGGABLE_CACHE, false, Property.NodeScope); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 6d5c23274dbd6..92fb278c946f1 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -39,11 +39,13 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.RamUsageEstimator; import org.opensearch.common.CheckedSupplier; -import org.opensearch.common.cache.Cache; -import org.opensearch.common.cache.CacheBuilder; -import org.opensearch.common.cache.CacheLoader; +import org.opensearch.common.cache.CacheType; +import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.service.CacheService; +import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -69,6 +71,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.function.Function; +import java.util.function.ToLongBiFunction; /** * The indices request cache allows to cache a shard level request stage responses, helping with improving @@ -116,22 +119,26 @@ public final class IndicesRequestCache implements RemovalListener keysToClean = ConcurrentCollections.newConcurrentSet(); private final ByteSizeValue size; private final TimeValue expire; - private final Cache cache; + private final ICache cache; private final Function> cacheEntityLookup; - IndicesRequestCache(Settings settings, Function> cacheEntityFunction) { + IndicesRequestCache(Settings settings, Function> cacheEntityFunction, CacheService cacheService) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; long sizeInBytes = size.getBytes(); - CacheBuilder cacheBuilder = CacheBuilder.builder() - .setMaximumWeight(sizeInBytes) - .weigher((k, v) -> k.ramBytesUsed() + v.ramBytesUsed()) - .removalListener(this); - if (expire != null) { - cacheBuilder.setExpireAfterAccess(expire); - } - cache = cacheBuilder.build(); + ToLongBiFunction weigher = (k, v) -> k.ramBytesUsed() + v.ramBytesUsed(); this.cacheEntityLookup = cacheEntityFunction; + this.cache = cacheService.createCache( + new CacheConfig.Builder().setSettings(settings) + .setWeigher(weigher) + .setValueType(BytesReference.class) + .setKeyType(Key.class) + .setRemovalListener(this) + .setMaxSizeInBytes(sizeInBytes) // for backward compatibility + .setExpireAfterAccess(expire) // for backward compatibility + .build(), + CacheType.INDICES_REQUEST_CACHE + ); } @Override @@ -204,7 +211,7 @@ void invalidate(IndicesService.IndexShardCacheEntity cacheEntity, DirectoryReade * * @opensearch.internal */ - private static class Loader implements CacheLoader { + private static class Loader implements LoadAwareCacheLoader { private final CacheEntity entity; private final CheckedSupplier loader; @@ -403,7 +410,7 @@ synchronized void cleanCache() { /** * Returns the current size of the cache */ - int count() { + long count() { return cache.count(); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index c83f2a4c5cd5d..8151c151e3968 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -62,6 +62,7 @@ import org.opensearch.common.CheckedSupplier; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; @@ -395,7 +396,8 @@ public IndicesService( Supplier repositoriesServiceSupplier, SearchRequestStats searchRequestStats, @Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, - RecoverySettings recoverySettings + RecoverySettings recoverySettings, + CacheService cacheService ) { this.settings = settings; this.threadPool = threadPool; @@ -412,7 +414,7 @@ public IndicesService( return Optional.empty(); } return Optional.of(new IndexShardCacheEntity(indexService.getShard(shardId.id()))); - })); + }), cacheService); this.indicesQueryCache = new IndicesQueryCache(settings); this.mapperRegistry = mapperRegistry; this.namedWriteableRegistry = namedWriteableRegistry; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 8322aaaea9798..3ef3ae4f6230e 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -823,7 +823,8 @@ protected Node( repositoriesServiceReference::get, searchRequestStats, remoteStoreStatsTrackerFactory, - recoverySettings + recoverySettings, + cacheService ); final IngestService ingestService = new IngestService( diff --git a/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java b/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java index 9d39f8a43ea58..b355161f6f310 100644 --- a/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java +++ b/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java @@ -10,19 +10,20 @@ import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.module.CacheModule; import org.opensearch.common.cache.settings.CacheSettings; import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.plugins.CachePlugin; import org.opensearch.test.OpenSearchTestCase; import java.util.List; import java.util.Map; -import static junit.framework.TestCase.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -33,22 +34,72 @@ public class CacheServiceTests extends OpenSearchTestCase { public void testWithCreateCacheForIndicesRequestCacheType() { CachePlugin mockPlugin1 = mock(CachePlugin.class); ICache.Factory factory1 = mock(ICache.Factory.class); - Map factoryMap = Map.of("cache1", factory1); + ICache.Factory onHeapCacheFactory = mock(OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.class); + Map factoryMap = Map.of( + "cache1", + factory1, + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, + onHeapCacheFactory + ); when(mockPlugin1.getCacheFactoryMap()).thenReturn(factoryMap); - Setting indicesRequestCacheSetting = CacheSettings.getConcreteSettingForCacheType(CacheType.INDICES_REQUEST_CACHE); - - CacheModule cacheModule = new CacheModule( - List.of(mockPlugin1), + Setting indicesRequestCacheSetting = CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE); + CacheService cacheService = new CacheService( + factoryMap, Settings.builder().put(indicesRequestCacheSetting.getKey(), "cache1").build() ); CacheConfig config = mock(CacheConfig.class); - ICache onHeapCache = mock(OpenSearchOnHeapCache.class); - when(factory1.create(eq(config), eq(CacheType.INDICES_REQUEST_CACHE), any(Map.class))).thenReturn(onHeapCache); + ICache mockOnHeapCache = mock(OpenSearchOnHeapCache.class); + when(onHeapCacheFactory.create(eq(config), eq(CacheType.INDICES_REQUEST_CACHE), any(Map.class))).thenReturn(mockOnHeapCache); + + ICache ircCache = cacheService.createCache(config, CacheType.INDICES_REQUEST_CACHE); + assertEquals(mockOnHeapCache, ircCache); + } + + public void testWithCreateCacheForIndicesRequestCacheTypeWithFeatureFlagTrue() { + CachePlugin mockPlugin1 = mock(CachePlugin.class); + ICache.Factory factory1 = mock(ICache.Factory.class); + ICache.Factory onHeapCacheFactory = mock(OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.class); + Map factoryMap = Map.of( + "cache1", + factory1, + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, + onHeapCacheFactory + ); + when(mockPlugin1.getCacheFactoryMap()).thenReturn(factoryMap); + + Setting indicesRequestCacheSetting = CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE); + CacheService cacheService = new CacheService( + factoryMap, + Settings.builder().put(indicesRequestCacheSetting.getKey(), "cache1").put(FeatureFlags.PLUGGABLE_CACHE, "true").build() + ); + CacheConfig config = mock(CacheConfig.class); + ICache mockOnHeapCache = mock(OpenSearchOnHeapCache.class); + when(factory1.create(eq(config), eq(CacheType.INDICES_REQUEST_CACHE), any(Map.class))).thenReturn(mockOnHeapCache); - CacheService cacheService = cacheModule.getCacheService(); ICache ircCache = cacheService.createCache(config, CacheType.INDICES_REQUEST_CACHE); - assertEquals(onHeapCache, ircCache); + assertEquals(mockOnHeapCache, ircCache); + } + + public void testWithCreateCacheForIndicesRequestCacheTypeWithFeatureFlagTrueAndStoreNameIsNull() { + CachePlugin mockPlugin1 = mock(CachePlugin.class); + ICache.Factory factory1 = mock(ICache.Factory.class); + ICache.Factory onHeapCacheFactory = mock(OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.class); + Map factoryMap = Map.of( + "cache1", + factory1, + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME, + onHeapCacheFactory + ); + when(mockPlugin1.getCacheFactoryMap()).thenReturn(factoryMap); + + CacheService cacheService = new CacheService(factoryMap, Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "true").build()); + CacheConfig config = mock(CacheConfig.class); + ICache mockOnHeapCache = mock(OpenSearchOnHeapCache.class); + when(onHeapCacheFactory.create(eq(config), eq(CacheType.INDICES_REQUEST_CACHE), any(Map.class))).thenReturn(mockOnHeapCache); + + ICache ircCache = cacheService.createCache(config, CacheType.INDICES_REQUEST_CACHE); + assertEquals(mockOnHeapCache, ircCache); } public void testWithCreateCacheWithNoStoreNamePresentForCacheType() { @@ -61,12 +112,29 @@ public void testWithCreateCacheWithNoStoreNamePresentForCacheType() { IllegalArgumentException.class, () -> cacheService.createCache(config, CacheType.INDICES_REQUEST_CACHE) ); - assertEquals("No configuration exists for cache type: INDICES_REQUEST_CACHE", ex.getMessage()); + assertEquals("No store name: [opensearch_onheap] is registered for cache type: INDICES_REQUEST_CACHE", ex.getMessage()); + } + + public void testWithCreateCacheWithDefaultStoreNameForIRC() { + CachePlugin mockPlugin1 = mock(CachePlugin.class); + ICache.Factory factory1 = mock(ICache.Factory.class); + Map factoryMap = Map.of("cache1", factory1); + when(mockPlugin1.getCacheFactoryMap()).thenReturn(factoryMap); + + CacheModule cacheModule = new CacheModule(List.of(mockPlugin1), Settings.EMPTY); + CacheConfig config = mock(CacheConfig.class); + when(config.getSettings()).thenReturn(Settings.EMPTY); + when(config.getWeigher()).thenReturn((k, v) -> 100); + when(config.getRemovalListener()).thenReturn(mock(RemovalListener.class)); + + CacheService cacheService = cacheModule.getCacheService(); + ICache iCache = cacheService.createCache(config, CacheType.INDICES_REQUEST_CACHE); + assertTrue(iCache instanceof OpenSearchOnHeapCache); } public void testWithCreateCacheWithInvalidStoreNameAssociatedForCacheType() { ICache.Factory factory1 = mock(ICache.Factory.class); - Setting indicesRequestCacheSetting = CacheSettings.getConcreteSettingForCacheType(CacheType.INDICES_REQUEST_CACHE); + Setting indicesRequestCacheSetting = CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE); Map factoryMap = Map.of("cache1", factory1); CacheService cacheService = new CacheService( factoryMap, @@ -81,6 +149,6 @@ public void testWithCreateCacheWithInvalidStoreNameAssociatedForCacheType() { IllegalArgumentException.class, () -> cacheService.createCache(config, CacheType.INDICES_REQUEST_CACHE) ); - assertEquals("No store name: [cache] is registered for cache type: INDICES_REQUEST_CACHE", ex.getMessage()); + assertEquals("No store name: [opensearch_onheap] is registered for cache type: INDICES_REQUEST_CACHE", ex.getMessage()); } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 73728aec12e51..b9cbbb2c65162 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -46,9 +46,12 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.cache.module.CacheModule; +import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.bytes.AbstractBytesReference; import org.opensearch.core.common.bytes.BytesReference; @@ -67,6 +70,7 @@ import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Optional; import java.util.UUID; @@ -80,7 +84,69 @@ public void testBasicOperationsCache() throws Exception { IndexShard indexShard = createIndex("test").getShard(0); IndicesRequestCache cache = new IndicesRequestCache( Settings.EMPTY, - (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))) + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService() + ); + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + + // initial cache + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); + Loader loader = new Loader(reader, 0); + BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + assertEquals("foo", value.streamInput().readString()); + ShardRequestCache requestCacheStats = indexShard.requestCache(); + assertEquals(0, requestCacheStats.stats().getHitCount()); + assertEquals(1, requestCacheStats.stats().getMissCount()); + assertEquals(0, requestCacheStats.stats().getEvictions()); + assertFalse(loader.loadedFromCache); + assertEquals(1, cache.count()); + + // cache hit + entity = new IndicesService.IndexShardCacheEntity(indexShard); + loader = new Loader(reader, 0); + value = cache.getOrCompute(entity, loader, reader, termBytes); + assertEquals("foo", value.streamInput().readString()); + requestCacheStats = indexShard.requestCache(); + assertEquals(1, requestCacheStats.stats().getHitCount()); + assertEquals(1, requestCacheStats.stats().getMissCount()); + assertEquals(0, requestCacheStats.stats().getEvictions()); + assertTrue(loader.loadedFromCache); + assertEquals(1, cache.count()); + assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > value.length()); + assertEquals(1, cache.numRegisteredCloseListeners()); + + // Closing the cache doesn't modify an already returned CacheEntity + if (randomBoolean()) { + reader.close(); + } else { + indexShard.close("test", true, true); // closed shard but reader is still open + cache.clear(entity); + } + cache.cleanCache(); + assertEquals(1, requestCacheStats.stats().getHitCount()); + assertEquals(1, requestCacheStats.stats().getMissCount()); + assertEquals(0, requestCacheStats.stats().getEvictions()); + assertTrue(loader.loadedFromCache); + assertEquals(0, cache.count()); + assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); + + IOUtils.close(reader, writer, dir, cache); + assertEquals(0, cache.numRegisteredCloseListeners()); + } + + public void testBasicOperationsCacheWithFeatureFlag() throws Exception { + IndexShard indexShard = createIndex("test").getShard(0); + CacheService cacheService = new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(); + IndicesRequestCache cache = new IndicesRequestCache( + Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(), + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), + cacheService ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -146,7 +212,7 @@ public void testCacheDifferentReaders() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - })); + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService()); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -247,7 +313,8 @@ public void testEviction() throws Exception { IndexShard indexShard = createIndex("test").getShard(0); IndicesRequestCache cache = new IndicesRequestCache( Settings.EMPTY, - (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))) + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService() ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -274,7 +341,8 @@ public void testEviction() throws Exception { IndexShard indexShard = createIndex("test1").getShard(0); IndicesRequestCache cache = new IndicesRequestCache( Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), - (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))) + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService() ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -319,7 +387,7 @@ public void testClearAllEntityIdentity() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - })); + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService()); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -414,7 +482,7 @@ public void testInvalidate() throws Exception { return Optional.empty(); } return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - })); + }), new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService()); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 7c50e961853b5..33f3577e15c52 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -157,6 +157,7 @@ import org.opensearch.common.CheckedConsumer; import org.opensearch.common.Nullable; import org.opensearch.common.SetOnce; +import org.opensearch.common.cache.module.CacheModule; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.IndexScopedSettings; @@ -239,6 +240,7 @@ import java.io.IOException; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -2072,7 +2074,8 @@ public void onFailure(final Exception e) { repositoriesServiceReference::get, null, new RemoteStoreStatsTrackerFactory(clusterService, settings), - DefaultRecoverySettings.INSTANCE + DefaultRecoverySettings.INSTANCE, + new CacheModule(new ArrayList<>(), settings).getCacheService() ); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); snapshotShardsService = new SnapshotShardsService(