From 669a75f60b337d3e7b3d168808b0eb577a62507b Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Tue, 10 Oct 2023 11:34:54 +0530 Subject: [PATCH] WIP Signed-off-by: Bhumika Saini --- .../admin/cluster/node/stats/NodeStats.java | 40 +++++++++++++++- .../SegmentReplicationPressureService.java | 4 ++ .../SegmentReplicationPressureStats.java | 48 +++++++++++++++++++ .../remote/RemoteStorePressureService.java | 6 +++ .../remote/RemoteStorePressureStats.java | 48 +++++++++++++++++++ .../java/org/opensearch/node/NodeService.java | 8 +++- 6 files changed, 151 insertions(+), 3 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/SegmentReplicationPressureStats.java create mode 100644 server/src/main/java/org/opensearch/index/remote/RemoteStorePressureStats.java diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index dd36b3b8db3ab..bd6a3b389b358 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -46,6 +46,8 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.discovery.DiscoveryStats; import org.opensearch.http.HttpStats; +import org.opensearch.index.SegmentReplicationPressureStats; +import org.opensearch.index.remote.RemoteStorePressureStats; import org.opensearch.index.stats.IndexingPressureStats; import org.opensearch.index.stats.ShardIndexingPressureStats; import org.opensearch.index.store.remote.filecache.FileCacheStats; @@ -142,6 +144,12 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private SearchPipelineStats searchPipelineStats; + @Nullable + private RemoteStorePressureStats remoteStorePressureStats; + + @Nullable + private SegmentReplicationPressureStats segmentReplicationPressureStats; + public NodeStats(StreamInput in) throws IOException { super(in); timestamp = in.readVLong(); @@ -198,6 +206,11 @@ public NodeStats(StreamInput in) throws IOException { } else { searchPipelineStats = null; } + // TODO: change to V_2_11_0 on main after backport to 2.x + if (in.getVersion().onOrAfter(Version.CURRENT)) { + remoteStorePressureStats = in.readOptionalWriteable(RemoteStorePressureStats::new); + segmentReplicationPressureStats = in.readOptionalWriteable(SegmentReplicationPressureStats::new); + } } public NodeStats( @@ -224,7 +237,9 @@ public NodeStats( @Nullable WeightedRoutingStats weightedRoutingStats, @Nullable FileCacheStats fileCacheStats, @Nullable TaskCancellationStats taskCancellationStats, - @Nullable SearchPipelineStats searchPipelineStats + @Nullable SearchPipelineStats searchPipelineStats, + @Nullable RemoteStorePressureStats remoteStorePressureStats, + @Nullable SegmentReplicationPressureStats segmentReplicationPressureStats ) { super(node); this.timestamp = timestamp; @@ -250,6 +265,8 @@ public NodeStats( this.fileCacheStats = fileCacheStats; this.taskCancellationStats = taskCancellationStats; this.searchPipelineStats = searchPipelineStats; + this.remoteStorePressureStats = remoteStorePressureStats; + this.segmentReplicationPressureStats = segmentReplicationPressureStats; } public long getTimestamp() { @@ -387,6 +404,16 @@ public SearchPipelineStats getSearchPipelineStats() { return searchPipelineStats; } + @Nullable + public RemoteStorePressureStats getRemoteStorePressureStats() { + return remoteStorePressureStats; + } + + @Nullable + public SegmentReplicationPressureStats getSegmentReplicationPressureStats() { + return segmentReplicationPressureStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -430,6 +457,11 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_9_0)) { out.writeOptionalWriteable(searchPipelineStats); } + // TODO: change to V_2_11_0 on main after backport to 2.x + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalWriteable(remoteStorePressureStats); + out.writeOptionalWriteable(segmentReplicationPressureStats); + } } @Override @@ -520,6 +552,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getSearchPipelineStats() != null) { getSearchPipelineStats().toXContent(builder, params); } + if (getRemoteStorePressureStats() != null) { + getRemoteStorePressureStats().toXContent(builder, params); + } + if (getSegmentReplicationPressureStats() != null) { + getSegmentReplicationPressureStats().toXContent(builder, params); + } return builder; } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index 4284daf9ffef4..8b06caa2e928b 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -179,6 +179,10 @@ public SegmentReplicationStats nodeStats() { return tracker.getStats(); } + public SegmentReplicationPressureStats pressureStats() { + return new SegmentReplicationPressureStats(tracker.getStatsForShard()); + } + public SegmentReplicationPerGroupStats getStatsForShard(IndexShard indexShard) { return tracker.getStatsForShard(indexShard); } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureStats.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureStats.java new file mode 100644 index 0000000000000..7ec8720b41a4b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureStats.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * Stats for Segment Replication Backpressure + * + * @opensearch.internal + */ +public class SegmentReplicationPressureStats implements Writeable, ToXContentFragment { + private final long totalRejections; + + public SegmentReplicationPressureStats(long totalRejections) { + this.totalRejections = totalRejections; + } + + public SegmentReplicationPressureStats(StreamInput in) throws IOException { + totalRejections = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(totalRejections); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("segment_replication_pressure"); + builder.field("total_rejections", totalRejections); + builder.endObject(); // segment_replication_pressure + + return builder; + } +} diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java index 2920b33921869..7a1b344696bbe 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java @@ -83,6 +83,12 @@ public void validateSegmentsUploadLag(ShardId shardId) { } } + RemoteStorePressureStats pressureStats() { + return new RemoteStorePressureStats( + remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId).getRejectionCount() + ); + } + /** * Abstract class for validating if lag is acceptable or not. * diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureStats.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureStats.java new file mode 100644 index 0000000000000..eb5e77a04457b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureStats.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * Stats for Remote Store Backpressure + * + * @opensearch.internal + */ +public class RemoteStorePressureStats implements Writeable, ToXContentFragment { + private final long totalRejections; + + public RemoteStorePressureStats(long totalRejections) { + this.totalRejections = totalRejections; + } + + public RemoteStorePressureStats(StreamInput in) throws IOException { + totalRejections = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(totalRejections); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("remote_store_pressure"); + builder.field("total_rejections", totalRejections); + builder.endObject(); // remote_store_pressure + + return builder; + } +} diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 2688b894cb9a7..e3572c08a1fb1 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -217,7 +217,9 @@ public NodeStats stats( boolean weightedRoutingStats, boolean fileCacheStats, boolean taskCancellation, - boolean searchPipelineStats + boolean searchPipelineStats, + boolean remoteStorePressureStats, + boolean segmentReplicationPressureStats ) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) @@ -245,7 +247,9 @@ public NodeStats stats( weightedRoutingStats ? WeightedRoutingStats.getInstance() : null, fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null, taskCancellation ? this.taskCancellationMonitoringService.stats() : null, - searchPipelineStats ? this.searchPipelineService.stats() : null + searchPipelineStats ? this.searchPipelineService.stats() : null, + remoteStorePressureStats ? . : null, + segmentReplicationPressureStats ? . : null ); }