forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Tiered Caching] Cache tier policies (opensearch-project#12542)
* Adds policy interface and took time policy impl Signed-off-by: Peter Alfonsi <[email protected]> * Changes IndicesService to write a CachePolicyInfoWrapper before the QSR Signed-off-by: Peter Alfonsi <[email protected]> * Moved took time logic from QSR to IndicesService Signed-off-by: Peter Alfonsi <[email protected]> * spotlessApply Signed-off-by: Peter Alfonsi <[email protected]> * Addressed ansjcy's comments Signed-off-by: Peter Alfonsi <[email protected]> * Partial rebase on most recent changes Signed-off-by: Peter Alfonsi <[email protected]> * Integrated policies with new TSC changes Signed-off-by: Peter Alfonsi <[email protected]> * Reverted unintended change to idea/vcs.xml Signed-off-by: Peter Alfonsi <[email protected]> * javadocs Signed-off-by: Peter Alfonsi <[email protected]> * github actions Signed-off-by: Peter Alfonsi <[email protected]> * Set default threshold value to 10 ms Signed-off-by: Peter Alfonsi <[email protected]> * Addressed Sorabh's comments Signed-off-by: Peter Alfonsi <[email protected]> * Addressed Sorabh's second round of comments Signed-off-by: Peter Alfonsi <[email protected]> * Set cachedQueryParser in IRC Signed-off-by: Peter Alfonsi <[email protected]> * Addressed Sorabh's comments besides dynamic setting Signed-off-by: Peter Alfonsi <[email protected]> * Removed dynamic setting, misc comments Signed-off-by: Peter Alfonsi <[email protected]> * Added changelog entry Signed-off-by: Peter Alfonsi <[email protected]> * Added missing javadoc Signed-off-by: Peter Alfonsi <[email protected]> * Fixed failed gradle run Signed-off-by: Peter Alfonsi <[email protected]> * Added setting validation test Signed-off-by: Peter Alfonsi <[email protected]> * rerun gradle for flaky IT Signed-off-by: Peter Alfonsi <[email protected]> * javadocs Signed-off-by: Peter Alfonsi <[email protected]> --------- Signed-off-by: Peter Alfonsi <[email protected]> Co-authored-by: Peter Alfonsi <[email protected]>
- Loading branch information
1 parent
0ad6b5e
commit ccdf3ff
Showing
14 changed files
with
645 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
70 changes: 70 additions & 0 deletions
70
modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
/* | ||
* Modifications Copyright OpenSearch Contributors. See | ||
* GitHub history for details. | ||
*/ | ||
|
||
package org.opensearch.cache.common.policy; | ||
|
||
import org.opensearch.common.cache.policy.CachedQueryResult; | ||
import org.opensearch.common.unit.TimeValue; | ||
|
||
import java.util.function.Function; | ||
import java.util.function.Predicate; | ||
|
||
/** | ||
* A cache tier policy which accepts queries whose took time is greater than some threshold. | ||
* The threshold should be set to approximately the time it takes to get a result from the cache tier. | ||
* The policy accepts values of type V and decodes them into CachedQueryResult.PolicyValues, which has the data needed | ||
* to decide whether to admit the value. | ||
* @param <V> The type of data consumed by test(). | ||
*/ | ||
public class TookTimePolicy<V> implements Predicate<V> { | ||
/** | ||
* The minimum took time to allow a query. Set to TimeValue.ZERO to let all data through. | ||
*/ | ||
private final TimeValue threshold; | ||
|
||
/** | ||
* Function which extracts the relevant PolicyValues from a serialized CachedQueryResult | ||
*/ | ||
private final Function<V, CachedQueryResult.PolicyValues> cachedResultParser; | ||
|
||
/** | ||
* Constructs a took time policy. | ||
* @param threshold the threshold | ||
* @param cachedResultParser the function providing policy values | ||
*/ | ||
public TookTimePolicy(TimeValue threshold, Function<V, CachedQueryResult.PolicyValues> cachedResultParser) { | ||
if (threshold.compareTo(TimeValue.ZERO) < 0) { | ||
throw new IllegalArgumentException("Threshold for TookTimePolicy must be >= 0ms but was " + threshold.getStringRep()); | ||
} | ||
this.threshold = threshold; | ||
this.cachedResultParser = cachedResultParser; | ||
} | ||
|
||
/** | ||
* Check whether to admit data. | ||
* @param data the input argument | ||
* @return whether to admit the data | ||
*/ | ||
public boolean test(V data) { | ||
long tookTimeNanos; | ||
try { | ||
tookTimeNanos = cachedResultParser.apply(data).getTookTimeNanos(); | ||
} catch (Exception e) { | ||
// If we can't read a CachedQueryResult.PolicyValues from the BytesReference, reject the data | ||
return false; | ||
} | ||
|
||
TimeValue tookTime = TimeValue.timeValueNanos(tookTimeNanos); | ||
return tookTime.compareTo(threshold) >= 0; | ||
} | ||
} |
10 changes: 10 additions & 0 deletions
10
modules/cache-common/src/main/java/org/opensearch/cache/common/policy/package-info.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
/** A package for policies controlling what can enter caches. */ | ||
package org.opensearch.cache.common.policy; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
103 changes: 103 additions & 0 deletions
103
...es/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.cache.common.policy; | ||
|
||
import org.apache.lucene.search.ScoreDoc; | ||
import org.apache.lucene.search.TopDocs; | ||
import org.apache.lucene.search.TotalHits; | ||
import org.opensearch.common.Randomness; | ||
import org.opensearch.common.cache.policy.CachedQueryResult; | ||
import org.opensearch.common.io.stream.BytesStreamOutput; | ||
import org.opensearch.common.lucene.search.TopDocsAndMaxScore; | ||
import org.opensearch.common.unit.TimeValue; | ||
import org.opensearch.core.common.bytes.BytesReference; | ||
import org.opensearch.core.common.io.stream.StreamOutput; | ||
import org.opensearch.search.DocValueFormat; | ||
import org.opensearch.search.query.QuerySearchResult; | ||
import org.opensearch.test.OpenSearchTestCase; | ||
|
||
import java.io.IOException; | ||
import java.util.Random; | ||
import java.util.function.Function; | ||
|
||
public class TookTimePolicyTests extends OpenSearchTestCase { | ||
private final Function<BytesReference, CachedQueryResult.PolicyValues> transformationFunction = (data) -> { | ||
try { | ||
return CachedQueryResult.getPolicyValues(data); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
}; | ||
|
||
private TookTimePolicy<BytesReference> getTookTimePolicy(TimeValue threshold) { | ||
return new TookTimePolicy<>(threshold, transformationFunction); | ||
} | ||
|
||
public void testTookTimePolicy() throws Exception { | ||
double threshMillis = 10; | ||
long shortMillis = (long) (0.9 * threshMillis); | ||
long longMillis = (long) (1.5 * threshMillis); | ||
TookTimePolicy<BytesReference> tookTimePolicy = getTookTimePolicy(new TimeValue((long) threshMillis)); | ||
BytesReference shortTime = getValidPolicyInput(shortMillis * 1000000); | ||
BytesReference longTime = getValidPolicyInput(longMillis * 1000000); | ||
|
||
boolean shortResult = tookTimePolicy.test(shortTime); | ||
assertFalse(shortResult); | ||
boolean longResult = tookTimePolicy.test(longTime); | ||
assertTrue(longResult); | ||
|
||
TookTimePolicy<BytesReference> disabledPolicy = getTookTimePolicy(TimeValue.ZERO); | ||
shortResult = disabledPolicy.test(shortTime); | ||
assertTrue(shortResult); | ||
longResult = disabledPolicy.test(longTime); | ||
assertTrue(longResult); | ||
} | ||
|
||
public void testNegativeOneInput() throws Exception { | ||
// PolicyValues with -1 took time can be passed to this policy if we shouldn't accept it for whatever reason | ||
TookTimePolicy<BytesReference> tookTimePolicy = getTookTimePolicy(TimeValue.ZERO); | ||
BytesReference minusOne = getValidPolicyInput(-1L); | ||
assertFalse(tookTimePolicy.test(minusOne)); | ||
} | ||
|
||
public void testInvalidThreshold() throws Exception { | ||
assertThrows(IllegalArgumentException.class, () -> getTookTimePolicy(TimeValue.MINUS_ONE)); | ||
} | ||
|
||
private BytesReference getValidPolicyInput(Long tookTimeNanos) throws IOException { | ||
// When it's used in the cache, the policy will receive BytesReferences which come from | ||
// serializing a CachedQueryResult. | ||
CachedQueryResult cachedQueryResult = new CachedQueryResult(getQSR(), tookTimeNanos); | ||
BytesStreamOutput out = new BytesStreamOutput(); | ||
cachedQueryResult.writeToNoId(out); | ||
return out.bytes(); | ||
} | ||
|
||
private QuerySearchResult getQSR() { | ||
// We can't mock the QSR with mockito because the class is final. Construct a real one | ||
QuerySearchResult mockQSR = new QuerySearchResult(); | ||
|
||
// duplicated from DfsQueryPhaseTests.java | ||
mockQSR.topDocs( | ||
new TopDocsAndMaxScore( | ||
new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }), | ||
2.0F | ||
), | ||
new DocValueFormat[0] | ||
); | ||
return mockQSR; | ||
} | ||
|
||
private void writeRandomBytes(StreamOutput out, int numBytes) throws IOException { | ||
Random rand = Randomness.get(); | ||
byte[] bytes = new byte[numBytes]; | ||
rand.nextBytes(bytes); | ||
out.writeBytes(bytes); | ||
} | ||
} |
Oops, something went wrong.