Skip to content

Commit

Permalink
Add test cases for cluster stats request/response serde.
Browse files Browse the repository at this point in the history
Signed-off-by: Swetha Guptha <[email protected]>
  • Loading branch information
Swetha Guptha committed Sep 30, 2024
1 parent 38a27c5 commit 7cd85cc
Show file tree
Hide file tree
Showing 11 changed files with 539 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest.IndexMetric;
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest.Metric;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.client.Client;
import org.opensearch.client.Requests;
import org.opensearch.cluster.health.ClusterHealthStatus;
Expand Down Expand Up @@ -320,6 +321,52 @@ public void testValuesSmokeScreen() throws IOException, ExecutionException, Inte
assertEquals(msg, OsStats.calculatePercentage(free, total), response.nodesStats.getOs().getMem().getFreePercent());
}

public void testValuesSmokeScreenWithMetricFilter() throws IOException, ExecutionException, InterruptedException {
internalCluster().startNodes(randomIntBetween(1, 3));
index("test1", "type", "1", "f", "f");

ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.computeAllMetrics(false)
.requestMetrics(Set.of(Metric.values()))
.indexMetrics(Set.of(IndexMetric.values()))
.get();
String msg = response.toString();
assertThat(msg, response.getTimestamp(), Matchers.greaterThan(946681200000L)); // 1 Jan 2000
assertThat(msg, response.indicesStats.getStore().getSizeInBytes(), Matchers.greaterThan(0L));

assertThat(msg, response.nodesStats.getFs().getTotal().getBytes(), Matchers.greaterThan(0L));
assertThat(msg, response.nodesStats.getJvm().getVersions().size(), Matchers.greaterThan(0));

assertThat(msg, response.nodesStats.getVersions().size(), Matchers.greaterThan(0));
assertThat(msg, response.nodesStats.getVersions().contains(Version.CURRENT), Matchers.equalTo(true));
assertThat(msg, response.nodesStats.getPlugins().size(), Matchers.greaterThanOrEqualTo(0));

assertThat(msg, response.nodesStats.getProcess().count, Matchers.greaterThan(0));
// 0 happens when not supported on platform
assertThat(msg, response.nodesStats.getProcess().getAvgOpenFileDescriptors(), Matchers.greaterThanOrEqualTo(0L));
// these can be -1 if not supported on platform
assertThat(msg, response.nodesStats.getProcess().getMinOpenFileDescriptors(), Matchers.greaterThanOrEqualTo(-1L));
assertThat(msg, response.nodesStats.getProcess().getMaxOpenFileDescriptors(), Matchers.greaterThanOrEqualTo(-1L));

NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().addMetric(OS.metricName()).get();
long total = 0;
long free = 0;
long used = 0;
for (NodeStats nodeStats : nodesStatsResponse.getNodes()) {
total += nodeStats.getOs().getMem().getTotal().getBytes();
free += nodeStats.getOs().getMem().getFree().getBytes();
used += nodeStats.getOs().getMem().getUsed().getBytes();
}
assertEquals(msg, free, response.nodesStats.getOs().getMem().getFree().getBytes());
assertEquals(msg, total, response.nodesStats.getOs().getMem().getTotal().getBytes());
assertEquals(msg, used, response.nodesStats.getOs().getMem().getUsed().getBytes());
assertEquals(msg, OsStats.calculatePercentage(used, total), response.nodesStats.getOs().getMem().getUsedPercent());
assertEquals(msg, OsStats.calculatePercentage(free, total), response.nodesStats.getOs().getMem().getFreePercent());
}

public void testAllocatedProcessors() throws Exception {
// start one node with 7 processors.
internalCluster().startNode(Settings.builder().put(OpenSearchExecutors.NODE_PROCESSORS_SETTING.getKey(), 7).build());
Expand Down Expand Up @@ -360,6 +407,9 @@ public void testFieldTypes() {
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.computeAllMetrics(randomBoolean())
.requestMetrics(Set.of(Metric.INDICES))
.indexMetrics(Set.of(IndexMetric.MAPPINGS))
.get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
assertTrue(response.getIndicesStats().getMappings().getFieldTypeStats().isEmpty());
Expand Down Expand Up @@ -518,7 +568,7 @@ public void testClusterStatsApplyMetricFilterDisabled() throws IOException {
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean());
assertFalse(clusterStatsRequestBuilder.request().applyMetricFiltering());
assertTrue(clusterStatsRequestBuilder.request().computeAllMetrics());

ClusterStatsResponse response = clusterStatsRequestBuilder.get();
assertNotNull(response.getNodesStats());
Expand All @@ -528,7 +578,7 @@ public void testClusterStatsApplyMetricFilterDisabled() throws IOException {
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.applyMetricFiltering(false)
.computeAllMetrics(true)
.requestMetrics(Set.of(Metric.FS, Metric.JVM, Metric.PLUGINS, Metric.OS))
.get();

Expand Down Expand Up @@ -578,7 +628,7 @@ public void testClusterStatsWithMetricsFilter() {
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean());
assertFalse(clusterStatsRequestBuilder.request().applyMetricFiltering());
assertTrue(clusterStatsRequestBuilder.request().computeAllMetrics());

ClusterStatsResponse response = clusterStatsRequestBuilder.get();
assertNotNull(response);
Expand All @@ -590,7 +640,7 @@ public void testClusterStatsWithMetricsFilter() {
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.requestMetrics(ClusterStatsNodes.NODE_STATS_METRICS)
.applyMetricFiltering(true)
.computeAllMetrics(false)
.get();
assertNotNull(statsResponseWithAllNodeStatsMetrics);
assertNotNull(statsResponseWithAllNodeStatsMetrics.getNodesStats());
Expand All @@ -615,7 +665,7 @@ public void testClusterStatsWithMetricsFilter() {
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.requestMetrics(Set.of(Metric.PLUGINS, Metric.INDICES))
.applyMetricFiltering(true)
.computeAllMetrics(false)
.get();
assertNotNull(statsResponseWithIndicesRequestMetrics);
assertNotNull(statsResponseWithIndicesRequestMetrics.getNodesStats());
Expand All @@ -633,12 +683,14 @@ public void testClusterStatsWithIndexMetricFilter() {
ensureGreen();

client().admin().indices().prepareCreate("test1").setMapping("{\"properties\":{\"foo\":{\"type\": \"keyword\"}}}").get();
IndexRequest indexRequest = new IndexRequest("test1").id("doc_id").source("{\"test_type\" : \"metrics_filter\"}");
client().index(indexRequest);

ClusterStatsRequestBuilder clusterStatsRequestBuilder = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean());
assertFalse(clusterStatsRequestBuilder.request().applyMetricFiltering());
assertTrue(clusterStatsRequestBuilder.request().computeAllMetrics());

ClusterStatsResponse response = clusterStatsRequestBuilder.get();
assertNotNull(response);
Expand All @@ -651,7 +703,7 @@ public void testClusterStatsWithIndexMetricFilter() {
.useAggregatedNodeLevelResponses(randomBoolean())
.requestMetrics(Set.of(Metric.INDICES))
.indexMetrics(Set.of(IndexMetric.MAPPINGS, IndexMetric.ANALYSIS))
.applyMetricFiltering(true)
.computeAllMetrics(false)
.get();
assertNotNull(statsResponseWithSpecificIndicesMetrics);
assertNull(statsResponseWithSpecificIndicesMetrics.getNodesStats());
Expand All @@ -666,7 +718,7 @@ public void testClusterStatsWithIndexMetricFilter() {
.useAggregatedNodeLevelResponses(randomBoolean())
.requestMetrics(Set.of(Metric.INDICES))
.indexMetrics(Set.of(IndexMetric.values()))
.applyMetricFiltering(true)
.computeAllMetrics(false)
.get();
assertNotNull(statsResponseWithAllIndicesMetrics);
assertNull(statsResponseWithAllIndicesMetrics.getNodesStats());
Expand All @@ -690,6 +742,32 @@ public void testClusterStatsWithIndexMetricFilter() {
assertNotNull(statsResponseWithAllIndicesMetrics.getIndicesStats().getStore());
}

public void testClusterStatsWithIndexMetricWithDocsFilter() throws IOException {
internalCluster().startNode();
createIndex("test1");

client().prepareIndex("test1").setId(Integer.toString(1)).setSource("field1", "value1").execute().actionGet();
client().prepareIndex("test1").setId(Integer.toString(2)).setSource("field2", "value2").execute().actionGet();
refreshAndWaitForReplication();

ClusterStatsResponse statsResponseWithAllIndicesMetrics = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.requestMetrics(Set.of(Metric.INDICES))
.indexMetrics(Set.of(IndexMetric.DOCS))
.computeAllMetrics(true)
.get();
assertNotNull(statsResponseWithAllIndicesMetrics);
assertNull(statsResponseWithAllIndicesMetrics.getNodesStats());
assertNotNull(statsResponseWithAllIndicesMetrics.getIndicesStats());
assertNull(statsResponseWithAllIndicesMetrics.getIndicesStats().getShards());
assertNotNull(statsResponseWithAllIndicesMetrics.getIndicesStats().getDocs());
assertEquals(2, statsResponseWithAllIndicesMetrics.getIndicesStats().getDocs().getCount());
assertEquals(0, statsResponseWithAllIndicesMetrics.getIndicesStats().getDocs().getDeleted());
assertTrue(statsResponseWithAllIndicesMetrics.getIndicesStats().getDocs().getAverageSizeInBytes() > 0);
}

private Map<String, Integer> getExpectedCounts(
int dataRoleCount,
int masterRoleCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ public class ClusterStatsRequest extends BaseNodesRequest<ClusterStatsRequest> {

private final Set<Metric> requestedMetrics = new HashSet<>();
private final Set<IndexMetric> indexMetricsRequested = new HashSet<>();
private Boolean applyMetricFiltering = false;
private Boolean computeAllMetric = true;

public ClusterStatsRequest(StreamInput in) throws IOException {
super(in);
if (in.getVersion().onOrAfter(Version.V_2_16_0)) {
useAggregatedNodeLevelResponses = in.readOptionalBoolean();
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
applyMetricFiltering = in.readOptionalBoolean();
computeAllMetric = in.readOptionalBoolean();
final long longMetricsFlags = in.readLong();
for (Metric metric : Metric.values()) {
if ((longMetricsFlags & (1 << metric.getIndex())) != 0) {
Expand Down Expand Up @@ -94,12 +94,12 @@ public void useAggregatedNodeLevelResponses(boolean useAggregatedNodeLevelRespon
this.useAggregatedNodeLevelResponses = useAggregatedNodeLevelResponses;
}

public boolean applyMetricFiltering() {
return applyMetricFiltering;
public boolean computeAllMetrics() {
return computeAllMetric;
}

public void applyMetricFiltering(boolean honourMetricFiltering) {
this.applyMetricFiltering = honourMetricFiltering;
public void computeAllMetrics(boolean computeAllMetrics) {
this.computeAllMetric = computeAllMetrics;
}

/**
Expand Down Expand Up @@ -136,7 +136,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalBoolean(useAggregatedNodeLevelResponses);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalBoolean(applyMetricFiltering);
out.writeOptionalBoolean(computeAllMetric);
long longMetricFlags = 0;
for (Metric metric : requestedMetrics) {
longMetricFlags |= (1 << metric.getIndex());
Expand Down Expand Up @@ -192,13 +192,15 @@ public int getIndex() {
*/
@PublicApi(since = "3.0.0")
public enum IndexMetric {
// Metrics computed from ShardStats
SHARDS("shards", 0),
DOCS("docs", 1),
STORE("store", 2),
FIELDDATA("fielddata", 3),
QUERY_CACHE("query_cache", 4),
COMPLETION("completion", 5),
SEGMENTS("segments", 6),
// Metrics computed from ClusterState
ANALYSIS("analysis", 7),
MAPPINGS("mappings", 8);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public final ClusterStatsRequestBuilder useAggregatedNodeLevelResponses(boolean
return this;
}

public final ClusterStatsRequestBuilder applyMetricFiltering(boolean applyMetricFiltering) {
request.applyMetricFiltering(applyMetricFiltering);
public final ClusterStatsRequestBuilder computeAllMetrics(boolean applyMetricFiltering) {
request.computeAllMetrics(applyMetricFiltering);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -94,6 +95,21 @@ public class TransportClusterStatsAction extends TransportNodesAction<
CommonStatsFlags.Flag.Segments
);

private static final Map<CommonStatsFlags.Flag, ClusterStatsRequest.IndexMetric> SHARDS_STATS_FLAG_MAP_TO_INDEX_METRIC = Map.of(
CommonStatsFlags.Flag.Docs,
ClusterStatsRequest.IndexMetric.DOCS,
CommonStatsFlags.Flag.Store,
ClusterStatsRequest.IndexMetric.STORE,
CommonStatsFlags.Flag.FieldData,
ClusterStatsRequest.IndexMetric.FIELDDATA,
CommonStatsFlags.Flag.QueryCache,
ClusterStatsRequest.IndexMetric.QUERY_CACHE,
CommonStatsFlags.Flag.Completion,
ClusterStatsRequest.IndexMetric.COMPLETION,
CommonStatsFlags.Flag.Segments,
ClusterStatsRequest.IndexMetric.SEGMENTS
);

private final NodeService nodeService;
private final IndicesService indicesService;

Expand Down Expand Up @@ -133,16 +149,14 @@ protected ClusterStatsResponse newResponse(
+ " the cluster state that are too slow for a transport thread"
);
ClusterState state = clusterService.state();
if (request.applyMetricFiltering()) {
if (request.computeAllMetrics()) {
return new ClusterStatsResponse(
System.currentTimeMillis(),
state.metadata().clusterUUID(),
clusterService.getClusterName(),
responses,
failures,
state,
request.requestedMetrics(),
request.indicesMetrics()
state
);
} else {
return new ClusterStatsResponse(
Expand All @@ -151,7 +165,9 @@ protected ClusterStatsResponse newResponse(
clusterService.getClusterName(),
responses,
failures,
state
state,
request.requestedMetrics(),
request.indicesMetrics()
);
}
}
Expand All @@ -169,21 +185,19 @@ protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOExce
@Override
protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest) {
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false, false, false);
boolean applyMetricFiltering = nodeRequest.request.applyMetricFiltering();
Set<Metric> requestedMetrics = nodeRequest.request.requestedMetrics();
NodeStats nodeStats = nodeService.stats(
CommonStatsFlags.NONE,
isMetricRequired(applyMetricFiltering, Metric.OS, requestedMetrics),
isMetricRequired(applyMetricFiltering, Metric.PROCESS, requestedMetrics),
isMetricRequired(applyMetricFiltering, Metric.JVM, requestedMetrics),
isMetricRequired(Metric.OS, nodeRequest.request),
isMetricRequired(Metric.PROCESS, nodeRequest.request),
isMetricRequired(Metric.JVM, nodeRequest.request),
false,
isMetricRequired(applyMetricFiltering, Metric.FS, requestedMetrics),
isMetricRequired(Metric.FS, nodeRequest.request),
false,
false,
false,
false,
false,
isMetricRequired(applyMetricFiltering, Metric.INGEST, requestedMetrics),
isMetricRequired(Metric.INGEST, nodeRequest.request),
false,
false,
false,
Expand All @@ -201,13 +215,8 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false
);
List<ShardStats> shardsStats = new ArrayList<>();
if (isMetricRequired(applyMetricFiltering, Metric.INDICES, requestedMetrics)) {
CommonStatsFlags commonStatsFlags = new CommonStatsFlags();
for (ClusterStatsRequest.IndexMetric indexMetric : nodeRequest.request.indicesMetrics()) {
if (INDEX_METRIC_TO_SHARDS_STATS_FLAG_MAP.containsKey(indexMetric)) {
commonStatsFlags.set(INDEX_METRIC_TO_SHARDS_STATS_FLAG_MAP.get(indexMetric), true);
}
}
if (isMetricRequired(Metric.INDICES, nodeRequest.request)) {
CommonStatsFlags commonStatsFlags = getCommonStatsFlags(nodeRequest);
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) {
Expand Down Expand Up @@ -255,8 +264,29 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
);
}

private boolean isMetricRequired(boolean applyMetricFilter, Metric metric, Set<Metric> requestedMetrics) {
return !applyMetricFilter || requestedMetrics.contains(metric);
/**
* A metric is required when: all cluster stats are required (OR) if the metric was requested
* @param metric
* @param clusterStatsRequest
* @return
*/
private boolean isMetricRequired(Metric metric, ClusterStatsRequest clusterStatsRequest) {
return clusterStatsRequest.computeAllMetrics() || clusterStatsRequest.requestedMetrics().contains(metric);
}

private static CommonStatsFlags getCommonStatsFlags(ClusterStatsNodeRequest nodeRequest) {
Set<CommonStatsFlags.Flag> requestedCommonStatsFlags = new HashSet<>();
if (nodeRequest.request.computeAllMetrics()) {
requestedCommonStatsFlags.addAll(INDEX_METRIC_TO_SHARDS_STATS_FLAG_MAP.values());
} else {
for (Map.Entry<CommonStatsFlags.Flag, ClusterStatsRequest.IndexMetric> entry : SHARDS_STATS_FLAG_MAP_TO_INDEX_METRIC
.entrySet()) {
if (nodeRequest.request.indicesMetrics().contains(entry.getValue())) {
requestedCommonStatsFlags.add(entry.getKey());
}
}
}
return new CommonStatsFlags(requestedCommonStatsFlags.toArray(new CommonStatsFlags.Flag[0]));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public QueryCacheStats(long ramBytesUsed, long hitCount, long missCount, long ca
}

public void add(QueryCacheStats stats) {
if (stats == null) {
return;
}
ramBytesUsed += stats.ramBytesUsed;
hitCount += stats.hitCount;
missCount += stats.missCount;
Expand Down
Loading

0 comments on commit 7cd85cc

Please sign in to comment.