Skip to content

Commit

Permalink
Optimize NodeIndicesStats output behind flag
Browse files Browse the repository at this point in the history
  • Loading branch information
Pranshu-S committed Jun 19, 2024
1 parent 93d507a commit 59a0f5d
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.opensearch.ExceptionsHelper;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
Expand All @@ -19,21 +21,26 @@
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.index.shard.IndexingStats.Stats.DocStatusStats;
import org.opensearch.indices.IndicesService;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.hamcrest.MatcherAssert;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static java.util.Collections.singletonMap;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

Expand Down Expand Up @@ -243,6 +250,53 @@ public void testNodeIndicesStatsDocStatusStatsCreateDeleteUpdate() {
}
}

public void testNodeIndicesStatsResponseOptimised() {
internalCluster().startNode();
ensureGreen();
String indexName = "test1";
index(indexName, "type", "1", "f", "f");
refresh();
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();

NodesStatsResponse response = client().admin().cluster().prepareNodesStats().get();
response.getNodes().forEach(nodeStats -> {
assertNotNull(nodeStats.getIndices().getShardStats(clusterState.metadata().index(indexName).getIndex()));
assertNull(nodeStats.getIndices().getIndexStats(clusterState.metadata().index(indexName).getIndex()));
});

assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(IndicesService.OPTIMIZED_NODES_STATS, true))
.get()
);

response = client().admin().cluster().prepareNodesStats().get();
response.getNodes().forEach(nodeStats -> {
assertNull(nodeStats.getIndices().getShardStats(clusterState.metadata().index(indexName).getIndex()));
assertNull(nodeStats.getIndices().getIndexStats(clusterState.metadata().index(indexName).getIndex()));

});
ArrayList<String> level_indices = new ArrayList<>();
level_indices.add("indices");
CommonStatsFlags commonStatsFlags = new CommonStatsFlags().setLevels(level_indices.toArray(new String[0]));
response = client().admin().cluster().prepareNodesStats().setIndices(commonStatsFlags).get();
response.getNodes().forEach(nodeStats -> {
assertNotNull(nodeStats.getIndices().getIndexStats(clusterState.metadata().index(indexName).getIndex()));
assertNull(nodeStats.getIndices().getShardStats(clusterState.metadata().index(indexName).getIndex()));
});

ArrayList<String> level_shards = new ArrayList<>();
level_shards.add("shards");
commonStatsFlags = new CommonStatsFlags().setLevels(level_shards.toArray(new String[0]));
response = client().admin().cluster().prepareNodesStats().setIndices(commonStatsFlags).get();
response.getNodes().forEach(nodeStats -> {
assertNotNull(nodeStats.getIndices().getShardStats(clusterState.metadata().index(indexName).getIndex()));
assertNull(nodeStats.getIndices().getIndexStats(clusterState.metadata().index(indexName).getIndex()));
});
}

private void assertDocStatusStats() {
DocStatusStats docStatusStats = client().admin()
.cluster()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,9 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA,
SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING
SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,

IndicesService.OPTIMIZED_NODES_STATS_SETTING
)
)
);
Expand Down
21 changes: 20 additions & 1 deletion server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,17 @@ public class IndicesService extends AbstractLifecycleComponent
Setting.Property.NodeScope
);

public static final String OPTIMIZED_NODES_STATS = "opensearch.experimental.optimization.nodes_stats.enabled";

public static final Setting<Boolean> OPTIMIZED_NODES_STATS_SETTING = Setting.boolSetting(
OPTIMIZED_NODES_STATS,
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public volatile boolean optimizedNodesStatsEnabled;

/**
* Used to specify SEGMENT replication type as the default replication strategy for all indices in a cluster. By default, this is false.
*/
Expand Down Expand Up @@ -433,6 +444,8 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
circuitBreakerService.getBreaker(CircuitBreaker.FIELDDATA).addWithoutBreaking(-sizeInBytes);
}
});
this.optimizedNodesStatsEnabled = OPTIMIZED_NODES_STATS_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(OPTIMIZED_NODES_STATS_SETTING, this::setOptimizedNodesStats);
this.cleanInterval = INDICES_CACHE_CLEAN_INTERVAL_SETTING.get(settings);
this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, logger, threadPool, this.cleanInterval);
this.metaStateService = metaStateService;
Expand Down Expand Up @@ -622,7 +635,9 @@ public NodeIndicesStats stats(CommonStatsFlags flags) {
break;
}
}

if (optimizedNodesStatsEnabled) {
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats, flags.getLevels());
}
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats);
}

Expand Down Expand Up @@ -1896,6 +1911,10 @@ private void setIdFieldDataEnabled(boolean value) {
this.idFieldDataEnabled = value;
}

private void setOptimizedNodesStats(boolean optimizedNodesStatsEnabled) {
this.optimizedNodesStatsEnabled = optimizedNodesStatsEnabled;
}

private void updateDanglingIndicesInfo(Index index) {
assert DiscoveryNode.isDataNode(settings) : "dangling indices information should only be persisted on data nodes";
assert nodeWriteDanglingIndicesInfo : "writing dangling indices info is not enabled";
Expand Down
137 changes: 122 additions & 15 deletions server/src/main/java/org/opensearch/indices/NodeIndicesStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.indices;

import org.opensearch.Version;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.action.admin.indices.stats.IndexShardStats;
import org.opensearch.action.admin.indices.stats.ShardStats;
Expand Down Expand Up @@ -63,6 +64,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -75,22 +77,21 @@
@PublicApi(since = "1.0.0")
public class NodeIndicesStats implements Writeable, ToXContentFragment {
private CommonStats stats;
private Map<Index, CommonStats> statsByIndex;
private Map<Index, List<IndexShardStats>> statsByShard;

public NodeIndicesStats(StreamInput in) throws IOException {
stats = new CommonStats(in);
if (in.getVersion().onOrAfter(Version.V_2_13_0)) {
// contains statsByIndex
if (in.readBoolean()) {
statsByIndex = new HashMap<>();
readStatsByIndex(in);
}
}
if (in.readBoolean()) {
int entries = in.readVInt();
statsByShard = new HashMap<>();
for (int i = 0; i < entries; i++) {
Index index = new Index(in);
int indexShardListSize = in.readVInt();
List<IndexShardStats> indexShardStats = new ArrayList<>(indexShardListSize);
for (int j = 0; j < indexShardListSize; j++) {
indexShardStats.add(new IndexShardStats(in));
}
statsByShard.put(index, indexShardStats);
}
readStatsByShards(in);
}
}

Expand All @@ -112,6 +113,57 @@ public NodeIndicesStats(CommonStats oldStats, Map<Index, List<IndexShardStats>>
}
}

public NodeIndicesStats(
CommonStats oldStats,
Map<Index, List<IndexShardStats>> statsByShard,
SearchRequestStats searchRequestStats,
String[] levels
) {
// make a total common stats from old ones and current ones
this.stats = oldStats;
for (List<IndexShardStats> shardStatsList : statsByShard.values()) {
for (IndexShardStats indexShardStats : shardStatsList) {
for (ShardStats shardStats : indexShardStats.getShards()) {
stats.add(shardStats.getStats());
}
}
}

if (this.stats.search != null) {
this.stats.search.setSearchRequestStats(searchRequestStats);
}

if (levels != null) {
if (Arrays.stream(levels).anyMatch(NodeIndicesStats.levels.indices::equals)) {
this.statsByIndex = createStatsByIndex(statsByShard);
} else if (Arrays.stream(levels).anyMatch(NodeIndicesStats.levels.shards::equals)) {
this.statsByShard = statsByShard;
}
}
}

private void readStatsByIndex(StreamInput in) throws IOException {
int indexEntries = in.readVInt();
for (int i = 0; i < indexEntries; i++) {
Index index = new Index(in);
CommonStats commonStats = new CommonStats(in);
statsByIndex.put(index, commonStats);
}
}

private void readStatsByShards(StreamInput in) throws IOException {
int entries = in.readVInt();
for (int i = 0; i < entries; i++) {
Index index = new Index(in);
int indexShardListSize = in.readVInt();
List<IndexShardStats> indexShardStats = new ArrayList<>(indexShardListSize);
for (int j = 0; j < indexShardListSize; j++) {
indexShardStats.add(new IndexShardStats(in));
}
statsByShard.put(index, indexShardStats);
}
}

@Nullable
public StoreStats getStore() {
return stats.getStore();
Expand Down Expand Up @@ -195,7 +247,31 @@ public RecoveryStats getRecoveryStats() {
@Override
public void writeTo(StreamOutput out) throws IOException {
stats.writeTo(out);

if (out.getVersion().onOrAfter(Version.V_2_13_0)) {
out.writeBoolean(statsByIndex != null);
if (statsByIndex != null) {
writeStatsByIndex(out);
}
}

out.writeBoolean(statsByShard != null);
if (statsByShard != null) {
writeStatsByShards(out);
}
}

private void writeStatsByIndex(StreamOutput out) throws IOException {
if (statsByIndex != null) {
out.writeVInt(statsByIndex.size());
for (Map.Entry<Index, CommonStats> entry : statsByIndex.entrySet()) {
entry.getKey().writeTo(out);
entry.getValue().writeTo(out);
}
}
}

private void writeStatsByShards(StreamOutput out) throws IOException {
if (statsByShard != null) {
out.writeVInt(statsByShard.size());
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
Expand All @@ -222,16 +298,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject(Fields.INDICES);
stats.toXContent(builder, params);

if ("indices".equals(level)) {
Map<Index, CommonStats> indexStats = createStatsByIndex();
if (levels.indices.equals(level)) {
builder.startObject(Fields.INDICES);
for (Map.Entry<Index, CommonStats> entry : indexStats.entrySet()) {
for (Map.Entry<Index, CommonStats> entry : statsByIndex.entrySet()) {
builder.startObject(entry.getKey().getName());
entry.getValue().toXContent(builder, params);
builder.endObject();
}
builder.endObject();
} else if ("shards".equals(level)) {
} else if (levels.shards.equals(level)) {
builder.startObject("shards");
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
builder.startArray(entry.getKey().getName());
Expand All @@ -251,7 +326,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

private Map<Index, CommonStats> createStatsByIndex() {
private Map<Index, CommonStats> createStatsByIndex(Map<Index, List<IndexShardStats>> statsByShard) {
Map<Index, CommonStats> statsMap = new HashMap<>();
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
if (!statsMap.containsKey(entry.getKey())) {
Expand All @@ -276,6 +351,14 @@ public List<IndexShardStats> getShardStats(Index index) {
}
}

public CommonStats getIndexStats(Index index) {
if (statsByIndex == null) {
return null;
} else {
return statsByIndex.get(index);
}
}

/**
* Fields used for parsing and toXContent
*
Expand All @@ -284,4 +367,28 @@ public List<IndexShardStats> getShardStats(Index index) {
static final class Fields {
static final String INDICES = "indices";
}

/**
* Levels for the NodeIndicesStats
*/
public enum levels {
nodes("nodes"),
indices("indices"),
shards("shards");

private final String name;

levels(String name) {
this.name = name;
}

@Override
public String toString() {
return name;
}

public boolean equals(String value) {
return this.name.equals(value);
}
}
}

0 comments on commit 59a0f5d

Please sign in to comment.