Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Bhumika Saini <[email protected]>
  • Loading branch information
Bhumika Saini committed Oct 10, 2023
1 parent 8bb11a6 commit 669a75f
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.http.HttpStats;
import org.opensearch.index.SegmentReplicationPressureStats;
import org.opensearch.index.remote.RemoteStorePressureStats;
import org.opensearch.index.stats.IndexingPressureStats;
import org.opensearch.index.stats.ShardIndexingPressureStats;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
Expand Down Expand Up @@ -142,6 +144,12 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private SearchPipelineStats searchPipelineStats;

@Nullable
private RemoteStorePressureStats remoteStorePressureStats;

@Nullable
private SegmentReplicationPressureStats segmentReplicationPressureStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -198,6 +206,11 @@ public NodeStats(StreamInput in) throws IOException {
} else {
searchPipelineStats = null;
}
// TODO: change to V_2_11_0 on main after backport to 2.x
if (in.getVersion().onOrAfter(Version.CURRENT)) {
remoteStorePressureStats = in.readOptionalWriteable(RemoteStorePressureStats::new);
segmentReplicationPressureStats = in.readOptionalWriteable(SegmentReplicationPressureStats::new);
}
}

public NodeStats(
Expand All @@ -224,7 +237,9 @@ public NodeStats(
@Nullable WeightedRoutingStats weightedRoutingStats,
@Nullable FileCacheStats fileCacheStats,
@Nullable TaskCancellationStats taskCancellationStats,
@Nullable SearchPipelineStats searchPipelineStats
@Nullable SearchPipelineStats searchPipelineStats,
@Nullable RemoteStorePressureStats remoteStorePressureStats,
@Nullable SegmentReplicationPressureStats segmentReplicationPressureStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -250,6 +265,8 @@ public NodeStats(
this.fileCacheStats = fileCacheStats;
this.taskCancellationStats = taskCancellationStats;
this.searchPipelineStats = searchPipelineStats;
this.remoteStorePressureStats = remoteStorePressureStats;
this.segmentReplicationPressureStats = segmentReplicationPressureStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -387,6 +404,16 @@ public SearchPipelineStats getSearchPipelineStats() {
return searchPipelineStats;
}

@Nullable
public RemoteStorePressureStats getRemoteStorePressureStats() {
return remoteStorePressureStats;
}

@Nullable
public SegmentReplicationPressureStats getSegmentReplicationPressureStats() {
return segmentReplicationPressureStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -430,6 +457,11 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
out.writeOptionalWriteable(searchPipelineStats);
}
// TODO: change to V_2_11_0 on main after backport to 2.x
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalWriteable(remoteStorePressureStats);
out.writeOptionalWriteable(segmentReplicationPressureStats);
}
}

@Override
Expand Down Expand Up @@ -520,6 +552,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getSearchPipelineStats() != null) {
getSearchPipelineStats().toXContent(builder, params);
}
if (getRemoteStorePressureStats() != null) {
getRemoteStorePressureStats().toXContent(builder, params);
}
if (getSegmentReplicationPressureStats() != null) {
getSegmentReplicationPressureStats().toXContent(builder, params);
}

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ public SegmentReplicationStats nodeStats() {
return tracker.getStats();
}

public SegmentReplicationPressureStats pressureStats() {
return new SegmentReplicationPressureStats(tracker.getStatsForShard());
}

public SegmentReplicationPerGroupStats getStatsForShard(IndexShard indexShard) {
return tracker.getStatsForShard(indexShard);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Stats for Segment Replication Backpressure
*
* @opensearch.internal
*/
public class SegmentReplicationPressureStats implements Writeable, ToXContentFragment {
private final long totalRejections;

public SegmentReplicationPressureStats(long totalRejections) {
this.totalRejections = totalRejections;
}

public SegmentReplicationPressureStats(StreamInput in) throws IOException {
totalRejections = in.readVLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(totalRejections);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("segment_replication_pressure");
builder.field("total_rejections", totalRejections);
builder.endObject(); // segment_replication_pressure

return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ public void validateSegmentsUploadLag(ShardId shardId) {
}
}

RemoteStorePressureStats pressureStats() {
return new RemoteStorePressureStats(
remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId).getRejectionCount()
);
}

/**
* Abstract class for validating if lag is acceptable or not.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.remote;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Stats for Remote Store Backpressure
*
* @opensearch.internal
*/
public class RemoteStorePressureStats implements Writeable, ToXContentFragment {
private final long totalRejections;

public RemoteStorePressureStats(long totalRejections) {
this.totalRejections = totalRejections;
}

public RemoteStorePressureStats(StreamInput in) throws IOException {
totalRejections = in.readVLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(totalRejections);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("remote_store_pressure");
builder.field("total_rejections", totalRejections);
builder.endObject(); // remote_store_pressure

return builder;
}
}
8 changes: 6 additions & 2 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ public NodeStats stats(
boolean weightedRoutingStats,
boolean fileCacheStats,
boolean taskCancellation,
boolean searchPipelineStats
boolean searchPipelineStats,
boolean remoteStorePressureStats,
boolean segmentReplicationPressureStats
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand Down Expand Up @@ -245,7 +247,9 @@ public NodeStats stats(
weightedRoutingStats ? WeightedRoutingStats.getInstance() : null,
fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null,
taskCancellation ? this.taskCancellationMonitoringService.stats() : null,
searchPipelineStats ? this.searchPipelineService.stats() : null
searchPipelineStats ? this.searchPipelineService.stats() : null,
remoteStorePressureStats ? <rs-pressure-service>. : null,
segmentReplicationPressureStats ? <sr-pressure-service>. : null
);
}

Expand Down

0 comments on commit 669a75f

Please sign in to comment.