Skip to content

Commit

Permalink
Test case changes and refactoring.
Browse files Browse the repository at this point in the history
Signed-off-by: Swetha Guptha <[email protected]>
  • Loading branch information
Swetha Guptha committed Oct 4, 2024
1 parent 656022d commit 204ae8c
Show file tree
Hide file tree
Showing 7 changed files with 385 additions and 212 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ public class ClusterStatsRequest extends BaseNodesRequest<ClusterStatsRequest> {

private final Set<Metric> requestedMetrics = new HashSet<>();
private final Set<IndexMetric> indexMetricsRequested = new HashSet<>();
private Boolean computeAllMetric = true;
private Boolean computeAllMetrics = true;

public ClusterStatsRequest(StreamInput in) throws IOException {
super(in);
if (in.getVersion().onOrAfter(Version.V_2_16_0)) {
useAggregatedNodeLevelResponses = in.readOptionalBoolean();
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
computeAllMetric = in.readOptionalBoolean();
computeAllMetrics = in.readOptionalBoolean();
final long longMetricsFlags = in.readLong();
for (Metric metric : Metric.values()) {
if ((longMetricsFlags & (1 << metric.getIndex())) != 0) {
Expand Down Expand Up @@ -95,11 +95,11 @@ public void useAggregatedNodeLevelResponses(boolean useAggregatedNodeLevelRespon
}

public boolean computeAllMetrics() {
return computeAllMetric;
return computeAllMetrics;
}

public void computeAllMetrics(boolean computeAllMetrics) {
this.computeAllMetric = computeAllMetrics;
this.computeAllMetrics = computeAllMetrics;
}

/**
Expand Down Expand Up @@ -136,7 +136,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalBoolean(useAggregatedNodeLevelResponses);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalBoolean(computeAllMetric);
out.writeOptionalBoolean(computeAllMetrics);
long longMetricFlags = 0;
for (Metric metric : requestedMetrics) {
longMetricFlags |= (1 << metric.getIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
}

/**
* A metric is required when: all cluster stats are required (OR) if the metric was requested
* A metric is required when: all cluster stats are required (OR) if the metric is requested
* @param metric
* @param clusterStatsRequest
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
package org.opensearch.rest.action.admin.cluster;

import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest;
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest.IndexMetric;
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest.Metric;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.common.Strings;
import org.opensearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -79,15 +81,15 @@ public List<Route> routes() {

static {
Map<String, Consumer<ClusterStatsRequest>> metricRequestConsumerMap = new HashMap<>();
for (ClusterStatsRequest.Metric metric : ClusterStatsRequest.Metric.values()) {
for (Metric metric : Metric.values()) {
metricRequestConsumerMap.put(metric.metricName(), request -> request.addMetric(metric));
}
METRIC_REQUEST_CONSUMER_MAP = Collections.unmodifiableMap(metricRequestConsumerMap);
}

static {
Map<String, Consumer<ClusterStatsRequest>> metricMap = new HashMap<>();
for (ClusterStatsRequest.IndexMetric indexMetric : ClusterStatsRequest.IndexMetric.values()) {
for (IndexMetric indexMetric : IndexMetric.values()) {
metricMap.put(indexMetric.metricName(), request -> request.addIndexMetric(indexMetric));
}
INDEX_METRIC_TO_REQUEST_CONSUMER_MAP = Collections.unmodifiableMap(metricMap);
Expand All @@ -106,89 +108,34 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC

public static ClusterStatsRequest fromRequest(final RestRequest request) {
Set<String> metrics = Strings.tokenizeByCommaToSet(request.param("metric", "_all"));
String indicesMetricsDefaultValue = metrics.contains(Metric.INDICES.metricName()) || metrics.contains("_all") ? "_all" : null;
Set<String> indexMetrics = Strings.tokenizeByCommaToSet(request.param("index_metric", indicesMetricsDefaultValue));
String[] nodeIds = request.paramAsStringArray("nodeId", null);

ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(nodeIds);
clusterStatsRequest.timeout(request.param("timeout"));
clusterStatsRequest.useAggregatedNodeLevelResponses(true);

if (!metrics.isEmpty()) {
if (metrics.size() > 1 && metrics.contains("_all")) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"request [%s] contains _all and individual metrics [%s]",
request.path(),
request.param("metric")
)
);
}

final Set<String> metricsRequested = new HashSet<>();
if (metrics.contains("_all")) {
metricsRequested.addAll(METRIC_REQUEST_CONSUMER_MAP.keySet());
} else {
metricsRequested.addAll(metrics);
}

Set<String> indexMetrics = Strings.tokenizeByCommaToSet(
request.param("index_metric", metricsRequested.contains(ClusterStatsRequest.Metric.INDICES.metricName()) ? "_all" : null)
);

if (indexMetrics.size() > 1 && indexMetrics.contains("_all")) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"request [%s] contains _all and individual index metrics [%s]",
request.path(),
request.param("index_metric")
)
);
}

final Set<String> invalidMetrics = new TreeSet<>();
for (String metric : metricsRequested) {
Consumer<ClusterStatsRequest> clusterStatsRequestConsumer = METRIC_REQUEST_CONSUMER_MAP.get(metric);
if (clusterStatsRequestConsumer != null) {
clusterStatsRequestConsumer.accept(clusterStatsRequest);
} else {
invalidMetrics.add(metric);
}
}
paramValidations(metrics, indexMetrics, request);
final Set<String> metricsRequested = metrics.contains("_all")
? new HashSet<>(METRIC_REQUEST_CONSUMER_MAP.keySet())
: new HashSet<>(metrics);
Set<String> invalidMetrics = validateAndSetRequestedMetrics(metricsRequested, METRIC_REQUEST_CONSUMER_MAP, clusterStatsRequest);
if (!invalidMetrics.isEmpty()) {
throw new IllegalArgumentException(
unrecognizedStrings(request, invalidMetrics, METRIC_REQUEST_CONSUMER_MAP.keySet(), "metric")
);
}

if (!metricsRequested.contains(ClusterStatsRequest.Metric.INDICES.metricName()) && !indexMetrics.isEmpty()) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"request [%s] contains index metrics [%s] but indices stats not requested",
request.path(),
request.param("index_metric")
)
if (metricsRequested.contains(Metric.INDICES.metricName())) {
final Set<String> indexMetricsRequested = indexMetrics.contains("_all")
? INDEX_METRIC_TO_REQUEST_CONSUMER_MAP.keySet()
: new HashSet<>(indexMetrics);
Set<String> invalidIndexMetrics = validateAndSetRequestedMetrics(
indexMetricsRequested,
INDEX_METRIC_TO_REQUEST_CONSUMER_MAP,
clusterStatsRequest
);
}

if (metricsRequested.contains(ClusterStatsRequest.Metric.INDICES.metricName())) {
final Set<String> indexMetricsRequested = new HashSet<>();
if (indexMetrics.contains("_all")) {
indexMetricsRequested.addAll(INDEX_METRIC_TO_REQUEST_CONSUMER_MAP.keySet());
} else {
indexMetricsRequested.addAll(indexMetrics);
}
final Set<String> invalidIndexMetrics = new TreeSet<>();
for (String indexMetric : indexMetricsRequested) {
Consumer<ClusterStatsRequest> clusterStatsRequestConsumer = INDEX_METRIC_TO_REQUEST_CONSUMER_MAP.get(indexMetric);
if (clusterStatsRequestConsumer != null) {
clusterStatsRequestConsumer.accept(clusterStatsRequest);
} else {
invalidIndexMetrics.add(indexMetric);
}
}

if (!invalidIndexMetrics.isEmpty()) {
throw new IllegalArgumentException(
unrecognizedStrings(request, invalidIndexMetrics, INDEX_METRIC_TO_REQUEST_CONSUMER_MAP.keySet(), "index metric")
Expand All @@ -197,9 +144,62 @@ public static ClusterStatsRequest fromRequest(final RestRequest request) {
}
clusterStatsRequest.computeAllMetrics(false);
}

return clusterStatsRequest;
}

private static void paramValidations(Set<String> metrics, Set<String> indexMetrics, RestRequest request) {
if (metrics.size() > 1 && metrics.contains("_all")) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"request [%s] contains _all and individual metrics [%s]",
request.path(),
request.param("metric")
)
);
}

if (indexMetrics.size() > 1 && indexMetrics.contains("_all")) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"request [%s] contains _all and individual index metrics [%s]",
request.path(),
request.param("index_metric")
)
);
}

if (!metrics.contains(Metric.INDICES.metricName()) && !metrics.contains("_all") && !indexMetrics.isEmpty()) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"request [%s] contains index metrics [%s] but indices stats not requested",
request.path(),
request.param("index_metric")
)
);
}
}

private static Set<String> validateAndSetRequestedMetrics(
Set<String> metrics,
Map<String, Consumer<ClusterStatsRequest>> metricConsumerMap,
ClusterStatsRequest clusterStatsRequest
) {
final Set<String> invalidMetrics = new TreeSet<>();
for (String metric : metrics) {
Consumer<ClusterStatsRequest> clusterStatsRequestConsumer = metricConsumerMap.get(metric);
if (clusterStatsRequestConsumer != null) {
clusterStatsRequestConsumer.accept(clusterStatsRequest);
} else {
invalidMetrics.add(metric);
}
}
return invalidMetrics;
}

@Override
public boolean canTripCircuitBreaker() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,41 @@
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;

import java.util.Set;

public class ClusterStatsRequestTests extends OpenSearchTestCase {

public void testSerialization() throws Exception {
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest();
clusterStatsRequest.computeAllMetrics(randomBoolean());
clusterStatsRequest.addMetric(ClusterStatsRequest.Metric.OS);
clusterStatsRequest.addMetric(ClusterStatsRequest.Metric.PLUGINS);
clusterStatsRequest.addMetric(ClusterStatsRequest.Metric.INDICES);
clusterStatsRequest.addIndexMetric(ClusterStatsRequest.IndexMetric.SHARDS);
clusterStatsRequest.addIndexMetric(ClusterStatsRequest.IndexMetric.QUERY_CACHE);
clusterStatsRequest.addIndexMetric(ClusterStatsRequest.IndexMetric.MAPPINGS);
clusterStatsRequest.useAggregatedNodeLevelResponses(randomBoolean());
public void testSerializationWithVersion3x() throws Exception {
ClusterStatsRequest clusterStatsRequest = getClusterStatsRequest();

Version testVersion = Version.V_3_0_0;

BytesStreamOutput output = new BytesStreamOutput();
output.setVersion(testVersion);
clusterStatsRequest.writeTo(output);

StreamInput streamInput = output.bytes().streamInput();
streamInput.setVersion(testVersion);
ClusterStatsRequest deserializedClusterStatsRequest = new ClusterStatsRequest(streamInput);

validateClusterStatsRequest(
Set.of(ClusterStatsRequest.Metric.OS, ClusterStatsRequest.Metric.PLUGINS, ClusterStatsRequest.Metric.INDICES),
Set.of(
ClusterStatsRequest.IndexMetric.SHARDS,
ClusterStatsRequest.IndexMetric.QUERY_CACHE,
ClusterStatsRequest.IndexMetric.MAPPINGS
),
Version.V_3_0_0,
deserializedClusterStatsRequest
);
assertEquals(-1, streamInput.read());
}

Version testVersion = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT);
public void testSerializationOnVersionBelow3x() throws Exception {
ClusterStatsRequest clusterStatsRequest = getClusterStatsRequest();

Version testVersion = Version.V_2_17_0;

BytesStreamOutput output = new BytesStreamOutput();
output.setVersion(testVersion);
Expand All @@ -37,13 +56,47 @@ public void testSerialization() throws Exception {
streamInput.setVersion(testVersion);
ClusterStatsRequest deserializedClusterStatsRequest = new ClusterStatsRequest(streamInput);

assertEquals(clusterStatsRequest.computeAllMetrics(), deserializedClusterStatsRequest.computeAllMetrics());
assertEquals(clusterStatsRequest.requestedMetrics(), deserializedClusterStatsRequest.requestedMetrics());
assertEquals(clusterStatsRequest.indicesMetrics(), deserializedClusterStatsRequest.indicesMetrics());
assertEquals(
clusterStatsRequest.useAggregatedNodeLevelResponses(),
deserializedClusterStatsRequest.useAggregatedNodeLevelResponses()
validateClusterStatsRequest(
Set.of(ClusterStatsRequest.Metric.OS, ClusterStatsRequest.Metric.PLUGINS, ClusterStatsRequest.Metric.INDICES),
Set.of(
ClusterStatsRequest.IndexMetric.SHARDS,
ClusterStatsRequest.IndexMetric.QUERY_CACHE,
ClusterStatsRequest.IndexMetric.MAPPINGS
),
Version.V_2_17_0,
deserializedClusterStatsRequest
);
assertEquals(-1, streamInput.read());
}

private ClusterStatsRequest getClusterStatsRequest() {
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest();
clusterStatsRequest.computeAllMetrics(true);
clusterStatsRequest.addMetric(ClusterStatsRequest.Metric.OS);
clusterStatsRequest.addMetric(ClusterStatsRequest.Metric.PLUGINS);
clusterStatsRequest.addMetric(ClusterStatsRequest.Metric.INDICES);
clusterStatsRequest.addIndexMetric(ClusterStatsRequest.IndexMetric.SHARDS);
clusterStatsRequest.addIndexMetric(ClusterStatsRequest.IndexMetric.QUERY_CACHE);
clusterStatsRequest.addIndexMetric(ClusterStatsRequest.IndexMetric.MAPPINGS);
clusterStatsRequest.useAggregatedNodeLevelResponses(true);
return clusterStatsRequest;
}

private void validateClusterStatsRequest(
Set<ClusterStatsRequest.Metric> metrics,
Set<ClusterStatsRequest.IndexMetric> indexMetrics,
Version version,
ClusterStatsRequest deserializedClusterStatsRequest
) {
if (version.before(Version.V_3_0_0)) {
assertEquals(true, deserializedClusterStatsRequest.computeAllMetrics());
assertTrue(deserializedClusterStatsRequest.requestedMetrics().isEmpty());
assertTrue(deserializedClusterStatsRequest.indicesMetrics().isEmpty());
} else {
assertEquals(true, deserializedClusterStatsRequest.computeAllMetrics());
assertEquals(metrics, deserializedClusterStatsRequest.requestedMetrics());
assertEquals(indexMetrics, deserializedClusterStatsRequest.indicesMetrics());
}
}

}
Loading

0 comments on commit 204ae8c

Please sign in to comment.