Skip to content

Commit

Permalink
Add total_rejections to replication stats
Browse files Browse the repository at this point in the history
Signed-off-by: Bhumika Saini <[email protected]>
  • Loading branch information
Bhumika Saini committed Oct 11, 2023
1 parent acb2c2d commit 9e1a497
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,9 @@ public static final IndexShard newIndexShard(
null,
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
nodeId,
null,
null

);
}

Expand Down
6 changes: 4 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,8 @@ public synchronized IndexShard createShard(
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
final SegmentReplicationPressureService segmentReplicationPressureService
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -522,7 +523,8 @@ public synchronized IndexShard createShard(
remoteStoreStatsTrackerFactory,
clusterRemoteTranslogBufferIntervalSupplier,
nodeEnv.nodeId(),
(RemoteSegmentStoreDirectoryFactory) remoteDirectoryFactory
(RemoteSegmentStoreDirectoryFactory) remoteDirectoryFactory,
segmentReplicationPressureService
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
31 changes: 30 additions & 1 deletion server/src/main/java/org/opensearch/index/ReplicationStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

package org.opensearch.index;

import org.opensearch.Version;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

Expand All @@ -29,17 +31,26 @@ public class ReplicationStats implements ToXContentFragment, Writeable {
public long maxBytesBehind;
public long maxReplicationLag;
public long totalBytesBehind;
public long totalRejections;
public ShardId shardId;

public ReplicationStats(long maxBytesBehind, long totalBytesBehind, long maxReplicationLag) {
public ReplicationStats(ShardId shardId, long maxBytesBehind, long totalBytesBehind, long maxReplicationLag, long totalRejections) {
this.shardId = shardId;
this.maxBytesBehind = maxBytesBehind;
this.totalBytesBehind = totalBytesBehind;
this.maxReplicationLag = maxReplicationLag;
this.totalRejections = totalRejections;
}

public ReplicationStats(StreamInput in) throws IOException {
this.maxBytesBehind = in.readVLong();
this.totalBytesBehind = in.readVLong();
this.maxReplicationLag = in.readVLong();
// TODO: change to V_2_11_0 on main after backport to 2.x
if (in.getVersion().onOrAfter(Version.CURRENT)) {
this.totalRejections = in.readVLong();
this.shardId = in.readOptionalWriteable(ShardId::new);
}
}

public ReplicationStats() {
Expand All @@ -51,6 +62,10 @@ public void add(ReplicationStats other) {
maxBytesBehind = Math.max(other.maxBytesBehind, maxBytesBehind);
totalBytesBehind += other.totalBytesBehind;
maxReplicationLag = Math.max(other.maxReplicationLag, maxReplicationLag);
// TODO
if (this.shardId != other.shardId) {
totalRejections += other.totalRejections;
}
}
}

Expand All @@ -66,11 +81,20 @@ public long getMaxReplicationLag() {
return this.maxReplicationLag;
}

public long getTotalRejections() {
return totalRejections;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(maxBytesBehind);
out.writeVLong(totalBytesBehind);
out.writeVLong(maxReplicationLag);
// TODO: change to V_2_11_0 on main after backport to 2.x
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeVLong(totalRejections);
out.writeOptionalWriteable(shardId);
}
}

@Override
Expand All @@ -79,6 +103,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(Fields.MAX_BYTES_BEHIND, new ByteSizeValue(maxBytesBehind).toString());
builder.field(Fields.TOTAL_BYTES_BEHIND, new ByteSizeValue(totalBytesBehind).toString());
builder.field(Fields.MAX_REPLICATION_LAG, new TimeValue(maxReplicationLag));

builder.startObject("pressure");
builder.field("total_rejections", totalRejections);
builder.endObject();

builder.endObject();
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.VersionType;
import org.opensearch.index.cache.IndexCache;
Expand Down Expand Up @@ -343,6 +344,7 @@ Runnable getGlobalCheckpointSyncer() {

private final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
private final RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory;
private final SegmentReplicationPressureService segmentReplicationPressureService;

public IndexShard(
final ShardRouting shardRouting,
Expand Down Expand Up @@ -374,7 +376,8 @@ public IndexShard(
// Wiring a directory factory here breaks some intended abstractions, but this remote directory
// factory is used not as a Lucene directory but instead to copy files from a remote store when
// restoring a shallow snapshot.
@Nullable final RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory
@Nullable final RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory,
final SegmentReplicationPressureService segmentReplicationPressureService
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -471,6 +474,7 @@ public boolean shouldCache(Query query) {
: mapperService.documentMapper().mappers().containsTimeStampField();
this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory;
this.remoteSegmentStoreDirectoryFactory = remoteSegmentStoreDirectoryFactory;
this.segmentReplicationPressureService = segmentReplicationPressureService;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -3001,7 +3005,8 @@ public ReplicationStats getReplicationStats() {
.mapToLong(SegmentReplicationShardStats::getCurrentReplicationTimeMillis)
.max()
.orElse(0L);
return new ReplicationStats(maxBytesBehind, totalBytesBehind, maxReplicationLag);
long totalRejections = segmentReplicationPressureService.getStatsForShard(this).getRejectedRequestCount();
return new ReplicationStats(shardId, maxBytesBehind, totalBytesBehind, maxReplicationLag, totalRejections);
}
return new ReplicationStats();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.index.cache.request.ShardRequestCache;
import org.opensearch.index.engine.CommitStats;
Expand Down Expand Up @@ -985,7 +986,8 @@ public IndexShard createShard(
final RetentionLeaseSyncer retentionLeaseSyncer,
final DiscoveryNode targetNode,
final DiscoveryNode sourceNode,
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
final SegmentReplicationPressureService segmentReplicationPressureService
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
ensureChangesAllowed();
Expand All @@ -997,7 +999,8 @@ public IndexShard createShard(
globalCheckpointSyncer,
retentionLeaseSyncer,
checkpointPublisher,
remoteStoreStatsTrackerFactory
remoteStoreStatsTrackerFactory,
segmentReplicationPressureService
);
indexShard.addShardFailureCallback(onShardFailure);
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.opensearch.index.IndexComponent;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.seqno.GlobalCheckpointSyncAction;
import org.opensearch.index.seqno.ReplicationTracker;
Expand Down Expand Up @@ -150,6 +151,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple

private final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory;

private final SegmentReplicationPressureService segmentReplicationPressureService;

@Inject
public IndicesClusterStateService(
final Settings settings,
Expand All @@ -169,7 +172,8 @@ public IndicesClusterStateService(
final GlobalCheckpointSyncAction globalCheckpointSyncAction,
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
final SegmentReplicationPressureService segmentReplicationPressureService
) {
this(
settings,
Expand All @@ -189,7 +193,8 @@ public IndicesClusterStateService(
primaryReplicaSyncer,
globalCheckpointSyncAction::updateGlobalCheckpointForShard,
retentionLeaseSyncer,
remoteStoreStatsTrackerFactory
remoteStoreStatsTrackerFactory,
segmentReplicationPressureService
);
}

Expand All @@ -212,7 +217,8 @@ public IndicesClusterStateService(
final PrimaryReplicaSyncer primaryReplicaSyncer,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
final SegmentReplicationPressureService segmentReplicationPressureService
) {
this.settings = settings;
this.checkpointPublisher = checkpointPublisher;
Expand All @@ -237,6 +243,7 @@ public IndicesClusterStateService(
this.retentionLeaseSyncer = Objects.requireNonNull(retentionLeaseSyncer);
this.sendRefreshMapping = settings.getAsBoolean("indices.cluster.send_refresh_mapping", true);
this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory;
this.segmentReplicationPressureService = segmentReplicationPressureService;
}

@Override
Expand Down Expand Up @@ -679,7 +686,8 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
retentionLeaseSyncer,
nodes.getLocalNode(),
sourceNode,
remoteStoreStatsTrackerFactory
remoteStoreStatsTrackerFactory,
segmentReplicationPressureService
);
} catch (Exception e) {
failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
Expand Down Expand Up @@ -1039,7 +1047,8 @@ T createShard(
RetentionLeaseSyncer retentionLeaseSyncer,
DiscoveryNode targetNode,
@Nullable DiscoveryNode sourceNode,
RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory
RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
SegmentReplicationPressureService segmentReplicationPressureService
) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem
s -> {},
RetentionLeaseSyncer.EMPTY,
SegmentReplicationCheckpointPublisher.EMPTY,
null,
null
);
IndexShardTestCase.updateRoutingEntry(shard, newRouting);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.shard.IndexEventListener;
Expand Down Expand Up @@ -264,7 +265,8 @@ public MockIndexShard createShard(
final RetentionLeaseSyncer retentionLeaseSyncer,
final DiscoveryNode targetNode,
final DiscoveryNode sourceNode,
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
final SegmentReplicationPressureService segmentReplicationPressureService
) throws IOException {
failRandomly();
RecoveryState recoveryState = new RecoveryState(shardRouting, targetNode, sourceNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ private IndicesClusterStateService createIndicesClusterStateService(
primaryReplicaSyncer,
s -> {},
RetentionLeaseSyncer.EMPTY,
null,
null
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2136,7 +2136,8 @@ public void onFailure(final Exception e) {
),
RetentionLeaseSyncer.EMPTY,
SegmentReplicationCheckpointPublisher.EMPTY,
mock(RemoteStoreStatsTrackerFactory.class)
mock(RemoteStoreStatsTrackerFactory.class),
mock(SegmentReplicationPressureService.class)
);

final SystemIndices systemIndices = new SystemIndices(emptyMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ protected IndexShard newShard(
remoteStoreStatsTrackerFactory,
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
"dummy-node",
null,
null
);
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
Expand Down

0 comments on commit 9e1a497

Please sign in to comment.