From 5a25350e7e2f4224d50744a8fb1def65b71f0356 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Wed, 16 Aug 2023 14:54:22 +0530 Subject: [PATCH] [Remote Store] Rename RemoteRefreshSegmentPressureSettings and RemoteRefreshSegmentPressureService (#9253) --------- Signed-off-by: Bhumika Saini --- .../RemoteSegmentStatsFromNodesStatsIT.java | 52 +++---- .../RemoteStoreBackpressureIT.java | 8 +- .../RemoteStoreRefreshListenerIT.java | 2 +- .../remotestore/RemoteStoreStatsIT.java | 66 ++++----- .../remotestore/stats/RemoteStoreStats.java | 20 +-- .../TransportRemoteStoreStatsAction.java | 10 +- .../action/bulk/TransportShardBulkAction.java | 13 +- .../common/settings/ClusterSettings.java | 16 +- .../org/opensearch/index/IndexService.java | 6 +- .../remote/RemoteSegmentTransferTracker.java | 57 +++++++- ...e.java => RemoteStorePressureService.java} | 20 +-- ....java => RemoteStorePressureSettings.java} | 6 +- .../opensearch/index/shard/IndexShard.java | 17 +-- .../store/DirectoryFileTransferTracker.java | 29 ++++ .../org/opensearch/indices/IndicesModule.java | 4 +- .../opensearch/indices/IndicesService.java | 6 +- .../cluster/IndicesClusterStateService.java | 18 +-- .../stats/RemoteStoreStatsTestHelper.java | 54 +++---- .../stats/RemoteStoreStatsTests.java | 138 +++--------------- .../TransportRemoteStoreStatsActionTests.java | 6 +- .../bulk/TransportShardBulkActionTests.java | 10 +- .../RemoteSegmentTransferTrackerTests.java | 47 +++--- ...a => RemoteStorePressureServiceTests.java} | 16 +- ... => RemoteStorePressureSettingsTests.java} | 86 ++++++----- .../index/shard/IndexShardTests.java | 3 +- .../RemoteStoreRefreshListenerTests.java | 46 +++--- ...actIndicesClusterStateServiceTestCase.java | 4 +- .../snapshots/SnapshotResiliencyTests.java | 6 +- .../index/shard/IndexShardTestCase.java | 12 +- 29 files changed, 374 insertions(+), 404 deletions(-) rename server/src/main/java/org/opensearch/index/remote/{RemoteRefreshSegmentPressureService.java => RemoteStorePressureService.java} (93%) rename server/src/main/java/org/opensearch/index/remote/{RemoteRefreshSegmentPressureSettings.java => RemoteStorePressureSettings.java} (98%) rename server/src/test/java/org/opensearch/index/remote/{RemoteRefreshSegmentPressureServiceTests.java => RemoteStorePressureServiceTests.java} (90%) rename server/src/test/java/org/opensearch/index/remote/{RemoteRefreshSegmentPressureSettingsTests.java => RemoteStorePressureSettingsTests.java} (65%) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java index a92c80b9cf840..e34c3310f1690 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java @@ -74,22 +74,22 @@ public void testNodesStatsParityWithOnlyPrimaryShards() { .prepareRemoteStoreStats(firstIndex, "0") .setLocal(true) .get(); - cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesSucceeded; - cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesStarted; - cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesFailed; - max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().bytesLag); - max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().refreshTimeLagMs); + cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded; + cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted; + cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed; + max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); + max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(randomDataNode).admin() .cluster() .prepareRemoteStoreStats(secondIndex, "0") .setLocal(true) .get(); - cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesSucceeded; - cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesStarted; - cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesFailed; - max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().bytesLag); - max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().refreshTimeLagMs); + cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded; + cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted; + cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed; + max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); + max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); // Fetch nodes stats NodesStatsResponse nodesStatsResponse = client().admin() @@ -188,34 +188,34 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s .prepareRemoteStoreStats(firstIndex, "0") .setLocal(true) .get(); - cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesSucceeded; - cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesStarted; - cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().uploadBytesFailed; + cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded; + cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted; + cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed; cumulativeDownloadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0] - .getStats().directoryFileTransferTrackerStats.transferredBytesSucceeded; + .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesSucceeded; cumulativeDownloadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0] - .getStats().directoryFileTransferTrackerStats.transferredBytesStarted; + .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted; cumulativeDownloadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0] - .getStats().directoryFileTransferTrackerStats.transferredBytesFailed; - max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().bytesLag); - max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getStats().refreshTimeLagMs); + .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed; + max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); + max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(dataNode).admin() .cluster() .prepareRemoteStoreStats(secondIndex, "0") .setLocal(true) .get(); - cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesSucceeded; - cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesStarted; - cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().uploadBytesFailed; + cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded; + cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted; + cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed; cumulativeDownloadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0] - .getStats().directoryFileTransferTrackerStats.transferredBytesSucceeded; + .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesSucceeded; cumulativeDownloadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0] - .getStats().directoryFileTransferTrackerStats.transferredBytesStarted; + .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted; cumulativeDownloadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0] - .getStats().directoryFileTransferTrackerStats.transferredBytesFailed; - max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().bytesLag); - max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getStats().refreshTimeLagMs); + .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed; + max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); + max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); // Fetch nodes stats NodesStatsResponse nodesStatsResponse = client().admin() diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java index 703cb7da3d009..c7b1a5c5247d0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java @@ -29,8 +29,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT; -import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; +import static org.opensearch.index.remote.RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT; +import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStoreBackpressureIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { @@ -120,10 +120,10 @@ private RemoteSegmentTransferTracker.Stats stats() { RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get(); final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId); List matches = Arrays.stream(response.getRemoteStoreStats()) - .filter(stat -> indexShardId.equals(stat.getStats().shardId.toString())) + .filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString())) .collect(Collectors.toList()); assertEquals(1, matches.size()); - return matches.get(0).getStats(); + return matches.get(0).getSegmentStats(); } private void indexDocAndRefresh(BytesReference source, int iterations) { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java index 4005e6359a2f7..b97e93f323fb2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java @@ -22,7 +22,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; -import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; +import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java index bd546a01b0b88..c932a7c96762f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java @@ -69,11 +69,11 @@ public void testStatsResponseFromAllNodes() { assertTrue(response.getRemoteStoreStats() != null && response.getRemoteStoreStats().length != 0); final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId); List matches = Arrays.stream(response.getRemoteStoreStats()) - .filter(stat -> indexShardId.equals(stat.getStats().shardId.toString())) + .filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString())) .collect(Collectors.toList()); assertEquals(1, matches.size()); - RemoteSegmentTransferTracker.Stats stats = matches.get(0).getStats(); - validateUploadStats(stats); + RemoteSegmentTransferTracker.Stats stats = matches.get(0).getSegmentStats(); + validateSegmentUploadStats(stats); assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted); } @@ -86,18 +86,18 @@ public void testStatsResponseFromAllNodes() { assertTrue(response.getRemoteStoreStats() != null && response.getRemoteStoreStats().length != 0); final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId); List matches = Arrays.stream(response.getRemoteStoreStats()) - .filter(stat -> indexShardId.equals(stat.getStats().shardId.toString())) + .filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString())) .collect(Collectors.toList()); assertEquals(2, matches.size()); for (RemoteStoreStats stat : matches) { ShardRouting routing = stat.getShardRouting(); validateShardRouting(routing); - RemoteSegmentTransferTracker.Stats stats = stat.getStats(); + RemoteSegmentTransferTracker.Stats stats = stat.getSegmentStats(); if (routing.primary()) { - validateUploadStats(stats); + validateSegmentUploadStats(stats); assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted); } else { - validateDownloadStats(stats); + validateSegmentDownloadStats(stats); assertEquals(0, stats.totalUploadsStarted); } } @@ -124,9 +124,9 @@ public void testStatsResponseAllShards() { RemoteStoreStatsResponse response = remoteStoreStatsRequestBuilder.get(); assertEquals(3, response.getSuccessfulShards()); assertTrue(response.getRemoteStoreStats() != null && response.getRemoteStoreStats().length == 3); - RemoteSegmentTransferTracker.Stats stats = response.getRemoteStoreStats()[0].getStats(); - validateUploadStats(stats); - assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted); + RemoteSegmentTransferTracker.Stats segmentStats = response.getRemoteStoreStats()[0].getSegmentStats(); + validateSegmentUploadStats(segmentStats); + assertEquals(0, segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted); // Step 3 - Enable replicas on the existing indices and ensure that download // stats are being populated as well @@ -137,13 +137,13 @@ public void testStatsResponseAllShards() { for (RemoteStoreStats stat : response.getRemoteStoreStats()) { ShardRouting routing = stat.getShardRouting(); validateShardRouting(routing); - stats = stat.getStats(); + segmentStats = stat.getSegmentStats(); if (routing.primary()) { - validateUploadStats(stats); - assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted); + validateSegmentUploadStats(segmentStats); + assertEquals(0, segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted); } else { - validateDownloadStats(stats); - assertEquals(0, stats.totalUploadsStarted); + validateSegmentDownloadStats(segmentStats); + assertEquals(0, segmentStats.totalUploadsStarted); } } @@ -171,9 +171,9 @@ public void testStatsResponseFromLocalNode() { RemoteStoreStatsResponse response = remoteStoreStatsRequestBuilder.get(); assertEquals(1, response.getSuccessfulShards()); assertTrue(response.getRemoteStoreStats() != null && response.getRemoteStoreStats().length == 1); - RemoteSegmentTransferTracker.Stats stats = response.getRemoteStoreStats()[0].getStats(); - validateUploadStats(stats); - assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted); + RemoteSegmentTransferTracker.Stats segmentStats = response.getRemoteStoreStats()[0].getSegmentStats(); + validateSegmentUploadStats(segmentStats); + assertEquals(0, segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted); } changeReplicaCountAndEnsureGreen(1); for (String node : nodes) { @@ -187,12 +187,12 @@ public void testStatsResponseFromLocalNode() { for (RemoteStoreStats stat : response.getRemoteStoreStats()) { ShardRouting routing = stat.getShardRouting(); validateShardRouting(routing); - RemoteSegmentTransferTracker.Stats stats = stat.getStats(); + RemoteSegmentTransferTracker.Stats stats = stat.getSegmentStats(); if (routing.primary()) { - validateUploadStats(stats); + validateSegmentUploadStats(stats); assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted); } else { - validateDownloadStats(stats); + validateSegmentDownloadStats(stats); assertEquals(0, stats.totalUploadsStarted); } } @@ -225,7 +225,7 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce .filter(remoteStoreStats -> remoteStoreStats.getShardRouting().primary()) .collect(Collectors.toList()) .get(0) - .getStats(); + .getSegmentStats(); assertTrue( zeroStatePrimaryStats.totalUploadsStarted == zeroStatePrimaryStats.totalUploadsSucceeded && zeroStatePrimaryStats.totalUploadsSucceeded == 1 @@ -241,7 +241,7 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce .filter(remoteStoreStats -> !remoteStoreStats.getShardRouting().primary()) .collect(Collectors.toList()) .get(0) - .getStats(); + .getSegmentStats(); assertTrue( zeroStateReplicaStats.directoryFileTransferTrackerStats.transferredBytesStarted == 0 && zeroStateReplicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded == 0 @@ -266,8 +266,8 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce .filter(remoteStoreStats -> !remoteStoreStats.getShardRouting().primary()) .collect(Collectors.toList()); assertEquals(1, replicaStatsList.size()); - RemoteSegmentTransferTracker.Stats primaryStats = primaryStatsList.get(0).getStats(); - RemoteSegmentTransferTracker.Stats replicaStats = replicaStatsList.get(0).getStats(); + RemoteSegmentTransferTracker.Stats primaryStats = primaryStatsList.get(0).getSegmentStats(); + RemoteSegmentTransferTracker.Stats replicaStats = replicaStatsList.get(0).getSegmentStats(); // Assert Upload syncs - zero state uploads == download syncs assertTrue(primaryStats.totalUploadsStarted > 0); assertTrue(primaryStats.totalUploadsSucceeded > 0); @@ -318,7 +318,7 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr .filter(remoteStoreStats -> remoteStoreStats.getShardRouting().primary()) .collect(Collectors.toList()) .get(0) - .getStats(); + .getSegmentStats(); assertTrue( zeroStatePrimaryStats.totalUploadsStarted == zeroStatePrimaryStats.totalUploadsSucceeded && zeroStatePrimaryStats.totalUploadsSucceeded == 1 @@ -335,8 +335,8 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr .collect(Collectors.toList()); zeroStateReplicaStats.forEach(stats -> { assertTrue( - stats.getStats().directoryFileTransferTrackerStats.transferredBytesStarted == 0 - && stats.getStats().directoryFileTransferTrackerStats.transferredBytesSucceeded == 0 + stats.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted == 0 + && stats.getSegmentStats().directoryFileTransferTrackerStats.transferredBytesSucceeded == 0 ); }); @@ -356,7 +356,7 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr // Assert that stats for primary shard and replica shard set are equal for (RemoteStoreStats eachStatsObject : response.getRemoteStoreStats()) { - RemoteSegmentTransferTracker.Stats stats = eachStatsObject.getStats(); + RemoteSegmentTransferTracker.Stats stats = eachStatsObject.getSegmentStats(); if (eachStatsObject.getShardRouting().primary()) { uploadBytesStarted = stats.uploadBytesStarted; uploadBytesSucceeded = stats.uploadBytesSucceeded; @@ -491,7 +491,7 @@ public void testStatsOnRemoteStoreRestore() throws IOException { RemoteStoreStatsResponse remoteStoreStatsResponse = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get(); Arrays.stream(remoteStoreStatsResponse.getRemoteStoreStats()).forEach(statObject -> { - RemoteSegmentTransferTracker.Stats segmentTracker = statObject.getStats(); + RemoteSegmentTransferTracker.Stats segmentTracker = statObject.getSegmentStats(); // Assert that we have both upload and download stats for the index assertTrue( segmentTracker.totalUploadsStarted > 0 && segmentTracker.totalUploadsSucceeded > 0 && segmentTracker.totalUploadsFailed == 0 @@ -520,7 +520,7 @@ public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exce .get() .getRemoteStoreStats(); Arrays.stream(remoteStoreStats).forEach(statObject -> { - RemoteSegmentTransferTracker.Stats segmentTracker = statObject.getStats(); + RemoteSegmentTransferTracker.Stats segmentTracker = statObject.getSegmentStats(); if (statObject.getShardRouting().primary()) { assertTrue( segmentTracker.totalUploadsSucceeded == 1 @@ -567,7 +567,7 @@ private void relocateShard(int shardId, String sourceNode, String destNode) { ensureGreen(INDEX_NAME); } - private void validateUploadStats(RemoteSegmentTransferTracker.Stats stats) { + private void validateSegmentUploadStats(RemoteSegmentTransferTracker.Stats stats) { assertEquals(0, stats.refreshTimeLagMs); assertEquals(stats.localRefreshNumber, stats.remoteRefreshNumber); assertTrue(stats.uploadBytesStarted > 0); @@ -584,7 +584,7 @@ private void validateUploadStats(RemoteSegmentTransferTracker.Stats stats) { assertTrue(stats.uploadTimeMovingAverage > 0); } - private void validateDownloadStats(RemoteSegmentTransferTracker.Stats stats) { + private void validateSegmentDownloadStats(RemoteSegmentTransferTracker.Stats stats) { assertTrue(stats.directoryFileTransferTrackerStats.lastTransferTimestampMs > 0); assertTrue(stats.directoryFileTransferTrackerStats.transferredBytesStarted > 0); assertTrue(stats.directoryFileTransferTrackerStats.transferredBytesSucceeded > 0); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java index 6b4c9a26ab19b..7ef0b379a7757 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java @@ -24,7 +24,9 @@ * @opensearch.internal */ public class RemoteStoreStats implements Writeable, ToXContentFragment { - + /** + * Stats related to Remote Segment Store operations + */ private final RemoteSegmentTransferTracker.Stats remoteSegmentShardStats; private final ShardRouting shardRouting; @@ -39,7 +41,7 @@ public RemoteStoreStats(StreamInput in) throws IOException { this.shardRouting = new ShardRouting(in); } - public RemoteSegmentTransferTracker.Stats getStats() { + public RemoteSegmentTransferTracker.Stats getSegmentStats() { return remoteSegmentShardStats; } @@ -55,16 +57,16 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(SubFields.DOWNLOAD); // Ensuring that we are not showing 0 metrics to the user if (remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesStarted != 0) { - buildDownloadStats(builder); + buildSegmentDownloadStats(builder); } - builder.endObject(); + builder.endObject(); // segment.download builder.startObject(SubFields.UPLOAD); // Ensuring that we are not showing 0 metrics to the user if (remoteSegmentShardStats.totalUploadsStarted != 0) { - buildUploadStats(builder); + buildSegmentUploadStats(builder); } - builder.endObject(); - builder.endObject(); + builder.endObject(); // segment.upload + builder.endObject(); // segment return builder.endObject(); } @@ -74,7 +76,7 @@ public void writeTo(StreamOutput out) throws IOException { shardRouting.writeTo(out); } - private void buildUploadStats(XContentBuilder builder) throws IOException { + private void buildSegmentUploadStats(XContentBuilder builder) throws IOException { builder.field(UploadStatsFields.LOCAL_REFRESH_TIMESTAMP, remoteSegmentShardStats.localRefreshClockTimeMs) .field(UploadStatsFields.REMOTE_REFRESH_TIMESTAMP, remoteSegmentShardStats.remoteRefreshClockTimeMs) .field(UploadStatsFields.REFRESH_TIME_LAG_IN_MILLIS, remoteSegmentShardStats.refreshTimeLagMs) @@ -104,7 +106,7 @@ private void buildUploadStats(XContentBuilder builder) throws IOException { builder.endObject(); } - private void buildDownloadStats(XContentBuilder builder) throws IOException { + private void buildSegmentDownloadStats(XContentBuilder builder) throws IOException { builder.field( DownloadStatsFields.LAST_SYNC_TIMESTAMP, remoteSegmentShardStats.directoryFileTransferTrackerStats.lastTransferTimestampMs diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java index 37835a5add3d6..6792ff339b5a4 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java @@ -23,7 +23,7 @@ import org.opensearch.common.inject.Inject; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.index.IndexService; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardNotFoundException; @@ -50,7 +50,7 @@ public class TransportRemoteStoreStatsAction extends TransportBroadcastByNodeAct private final IndicesService indicesService; - private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; + private final RemoteStorePressureService remoteStorePressureService; @Inject public TransportRemoteStoreStatsAction( @@ -59,7 +59,7 @@ public TransportRemoteStoreStatsAction( IndicesService indicesService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + RemoteStorePressureService remoteStorePressureService ) { super( RemoteStoreStatsAction.NAME, @@ -71,7 +71,7 @@ public TransportRemoteStoreStatsAction( ThreadPool.Names.MANAGEMENT ); this.indicesService = indicesService; - this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService; + this.remoteStorePressureService = remoteStorePressureService; } /** @@ -153,7 +153,7 @@ protected RemoteStoreStats shardOperation(RemoteStoreStatsRequest request, Shard throw new ShardNotFoundException(indexShard.shardId()); } - RemoteSegmentTransferTracker remoteSegmentTransferTracker = remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker( + RemoteSegmentTransferTracker remoteSegmentTransferTracker = remoteStorePressureService.getRemoteRefreshSegmentTracker( indexShard.shardId() ); assert Objects.nonNull(remoteSegmentTransferTracker); diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index aee8819606e93..a0d973070eda6 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -89,7 +89,7 @@ import org.opensearch.index.mapper.MapperException; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.SourceToParse; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.core.index.shard.ShardId; @@ -137,7 +137,7 @@ public class TransportShardBulkAction extends TransportWriteAction globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final SegmentReplicationCheckpointPublisher checkpointPublisher, - final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + final RemoteStorePressureService remoteStorePressureService ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* @@ -509,7 +509,7 @@ public synchronized IndexShard createShard( translogFactorySupplier, this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null, remoteStore, - remoteRefreshSegmentPressureService + remoteStorePressureService ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java index 16bcb0a7721bc..d89374e5e6b26 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java @@ -24,9 +24,10 @@ import java.io.IOException; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.Map; +import java.util.HashSet; import java.util.Set; +import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -675,5 +676,59 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(bytesLag); out.writeOptionalWriteable(directoryFileTransferTrackerStats); } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + Stats other = (Stats) obj; + + return this.shardId.toString().equals(other.shardId.toString()) + && this.localRefreshClockTimeMs == other.localRefreshClockTimeMs + && this.remoteRefreshClockTimeMs == other.remoteRefreshClockTimeMs + && this.refreshTimeLagMs == other.refreshTimeLagMs + && this.localRefreshNumber == other.localRefreshNumber + && this.remoteRefreshNumber == other.remoteRefreshNumber + && this.uploadBytesStarted == other.uploadBytesStarted + && this.uploadBytesFailed == other.uploadBytesFailed + && this.uploadBytesSucceeded == other.uploadBytesSucceeded + && this.totalUploadsStarted == other.totalUploadsStarted + && this.totalUploadsFailed == other.totalUploadsFailed + && this.totalUploadsSucceeded == other.totalUploadsSucceeded + && this.rejectionCount == other.rejectionCount + && this.consecutiveFailuresCount == other.consecutiveFailuresCount + && this.lastSuccessfulRemoteRefreshBytes == other.lastSuccessfulRemoteRefreshBytes + && Double.compare(this.uploadBytesMovingAverage, other.uploadBytesMovingAverage) == 0 + && Double.compare(this.uploadBytesPerSecMovingAverage, other.uploadBytesPerSecMovingAverage) == 0 + && Double.compare(this.uploadTimeMovingAverage, other.uploadTimeMovingAverage) == 0 + && this.bytesLag == other.bytesLag + && this.directoryFileTransferTrackerStats.equals(other.directoryFileTransferTrackerStats); + } + + @Override + public int hashCode() { + return Objects.hash( + shardId, + localRefreshClockTimeMs, + remoteRefreshClockTimeMs, + refreshTimeLagMs, + localRefreshNumber, + remoteRefreshNumber, + uploadBytesStarted, + uploadBytesFailed, + uploadBytesSucceeded, + totalUploadsStarted, + totalUploadsFailed, + totalUploadsSucceeded, + rejectionCount, + consecutiveFailuresCount, + lastSuccessfulRemoteRefreshBytes, + uploadBytesMovingAverage, + uploadBytesPerSecMovingAverage, + uploadTimeMovingAverage, + bytesLag, + directoryFileTransferTrackerStats + ); + } } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java similarity index 93% rename from server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java rename to server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java index 6f6364ac3b8a6..f2bbb94fd02ee 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java @@ -30,9 +30,9 @@ * * @opensearch.internal */ -public class RemoteRefreshSegmentPressureService implements IndexEventListener { +public class RemoteStorePressureService implements IndexEventListener { - private static final Logger logger = LogManager.getLogger(RemoteRefreshSegmentPressureService.class); + private static final Logger logger = LogManager.getLogger(RemoteStorePressureService.class); /** * Keeps map of remote-backed index shards and their corresponding backpressure tracker. @@ -42,13 +42,13 @@ public class RemoteRefreshSegmentPressureService implements IndexEventListener { /** * Remote refresh segment pressure settings which is used for creation of the backpressure tracker and as well as rejection. */ - private final RemoteRefreshSegmentPressureSettings pressureSettings; + private final RemoteStorePressureSettings pressureSettings; private final List lagValidators; @Inject - public RemoteRefreshSegmentPressureService(ClusterService clusterService, Settings settings) { - pressureSettings = new RemoteRefreshSegmentPressureSettings(clusterService, settings, this); + public RemoteStorePressureService(ClusterService clusterService, Settings settings) { + pressureSettings = new RemoteStorePressureSettings(clusterService, settings, this); lagValidators = Arrays.asList( new ConsecutiveFailureValidator(pressureSettings), new BytesLagValidator(pressureSettings), @@ -146,9 +146,9 @@ void updateMovingAverageWindowSize(BiConsumer translogFactorySupplier; private final boolean isTimeSeriesIndex; - - private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; + private final RemoteStorePressureService remoteStorePressureService; private final List internalRefreshListener = new ArrayList<>(); @@ -362,7 +361,7 @@ public IndexShard( final BiFunction translogFactorySupplier, @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, @Nullable final Store remoteStore, - final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + final RemoteStorePressureService remoteStorePressureService ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -457,7 +456,7 @@ public boolean shouldCache(Query query) { this.isTimeSeriesIndex = (mapperService == null || mapperService.documentMapper() == null) ? false : mapperService.documentMapper().mappers().containsTimeStampField(); - this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService; + this.remoteStorePressureService = remoteStorePressureService; } public ThreadPool getThreadPool() { @@ -547,8 +546,8 @@ public QueryCachingPolicy getQueryCachingPolicy() { } /** Only used for testing **/ - protected RemoteRefreshSegmentPressureService getRemoteRefreshSegmentPressureService() { - return remoteRefreshSegmentPressureService; + protected RemoteStorePressureService getRemoteStorePressureService() { + return remoteStorePressureService; } @Override @@ -1387,7 +1386,7 @@ public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean inclu // Populate remote_store stats only if the index is remote store backed if (indexSettings.isRemoteStoreEnabled()) { segmentsStats.addRemoteSegmentStats( - new RemoteSegmentStats(remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId).stats()) + new RemoteSegmentStats(remoteStorePressureService.getRemoteRefreshSegmentTracker(shardId).stats()) ); } return segmentsStats; @@ -3697,7 +3696,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro this, // Add the checkpoint publisher if the Segment Replciation via remote store is enabled. indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY, - remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId()) + remoteStorePressureService.getRemoteRefreshSegmentTracker(shardId()) ) ); } diff --git a/server/src/main/java/org/opensearch/index/store/DirectoryFileTransferTracker.java b/server/src/main/java/org/opensearch/index/store/DirectoryFileTransferTracker.java index 7e0e231d7bad9..5e12517becaf2 100644 --- a/server/src/main/java/org/opensearch/index/store/DirectoryFileTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/store/DirectoryFileTransferTracker.java @@ -14,6 +14,7 @@ import org.opensearch.core.common.io.stream.Writeable; import java.io.IOException; +import java.util.Objects; /** * Tracks the amount of bytes transferred between two {@link org.apache.lucene.store.Directory} instances @@ -191,5 +192,33 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(lastSuccessfulTransferInBytes); out.writeDouble(transferredBytesPerSecMovingAverage); } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + Stats stats = (Stats) obj; + + return transferredBytesStarted == stats.transferredBytesStarted + && transferredBytesFailed == stats.transferredBytesFailed + && transferredBytesSucceeded == stats.transferredBytesSucceeded + && lastTransferTimestampMs == stats.lastTransferTimestampMs + && Double.compare(stats.transferredBytesMovingAverage, transferredBytesMovingAverage) == 0 + && lastSuccessfulTransferInBytes == stats.lastSuccessfulTransferInBytes + && Double.compare(stats.transferredBytesPerSecMovingAverage, transferredBytesPerSecMovingAverage) == 0; + } + + @Override + public int hashCode() { + return Objects.hash( + transferredBytesStarted, + transferredBytesFailed, + transferredBytesSucceeded, + lastTransferTimestampMs, + transferredBytesMovingAverage, + lastSuccessfulTransferInBytes, + transferredBytesPerSecMovingAverage + ); + } } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index 9d2eef5f67a86..a10c93d9c1580 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -70,7 +70,7 @@ import org.opensearch.index.mapper.SourceFieldMapper; import org.opensearch.index.mapper.TextFieldMapper; import org.opensearch.index.mapper.VersionFieldMapper; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.seqno.RetentionLeaseBackgroundSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -289,7 +289,7 @@ protected void configure() { bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton(); bind(SegmentReplicationPressureService.class).asEagerSingleton(); if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) { - bind(RemoteRefreshSegmentPressureService.class).asEagerSingleton(); + bind(RemoteStorePressureService.class).asEagerSingleton(); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index f1cd0d1d1f291..e7c1502191aee 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -119,7 +119,7 @@ import org.opensearch.index.query.QueryRewriteContext; import org.opensearch.index.recovery.RecoveryStats; import org.opensearch.index.refresh.RefreshStats; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.seqno.RetentionLeaseStats; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -927,7 +927,7 @@ public IndexShard createShard( final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, final DiscoveryNode sourceNode, - final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + final RemoteStorePressureService remoteStorePressureService ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); ensureChangesAllowed(); @@ -939,7 +939,7 @@ public IndexShard createShard( globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher, - remoteRefreshSegmentPressureService + remoteStorePressureService ); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index a527ae09d7666..2daae9e7d81ea 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -65,7 +65,7 @@ import org.opensearch.index.IndexComponent; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -149,7 +149,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final SegmentReplicationCheckpointPublisher checkpointPublisher; - private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; + private final RemoteStorePressureService remoteStorePressureService; @Inject public IndicesClusterStateService( @@ -170,7 +170,7 @@ public IndicesClusterStateService( final GlobalCheckpointSyncAction globalCheckpointSyncAction, final RetentionLeaseSyncer retentionLeaseSyncer, final SegmentReplicationCheckpointPublisher checkpointPublisher, - final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + final RemoteStorePressureService remoteStorePressureService ) { this( settings, @@ -190,7 +190,7 @@ public IndicesClusterStateService( primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard, retentionLeaseSyncer, - remoteRefreshSegmentPressureService + remoteStorePressureService ); } @@ -213,7 +213,7 @@ public IndicesClusterStateService( final PrimaryReplicaSyncer primaryReplicaSyncer, final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, - final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + final RemoteStorePressureService remoteStorePressureService ) { this.settings = settings; this.checkpointPublisher = checkpointPublisher; @@ -225,7 +225,7 @@ public IndicesClusterStateService( indexEventListeners.add(segmentReplicationSourceService); // if remote store feature is not enabled, do not wire the remote upload pressure service as an IndexEventListener. if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) { - indexEventListeners.add(remoteRefreshSegmentPressureService); + indexEventListeners.add(remoteStorePressureService); } this.segmentReplicationTargetService = segmentReplicationTargetService; this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners); @@ -240,7 +240,7 @@ public IndicesClusterStateService( this.globalCheckpointSyncer = globalCheckpointSyncer; this.retentionLeaseSyncer = Objects.requireNonNull(retentionLeaseSyncer); this.sendRefreshMapping = settings.getAsBoolean("indices.cluster.send_refresh_mapping", true); - this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService; + this.remoteStorePressureService = remoteStorePressureService; } @Override @@ -683,7 +683,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR retentionLeaseSyncer, nodes.getLocalNode(), sourceNode, - remoteRefreshSegmentPressureService + remoteStorePressureService ); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); @@ -1042,7 +1042,7 @@ T createShard( RetentionLeaseSyncer retentionLeaseSyncer, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, - RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + RemoteStorePressureService remoteStorePressureService ) throws IOException; /** diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java index 0c081ee238e2d..3615ccbb9a96b 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java @@ -114,7 +114,7 @@ static ShardRouting createShardRouting(ShardId shardId, boolean isPrimary) { static void compareStatsResponse( Map statsObject, - RemoteSegmentTransferTracker.Stats statsTracker, + RemoteSegmentTransferTracker.Stats segmentStatsTracker, ShardRouting routing ) { assertEquals( @@ -134,135 +134,135 @@ static void compareStatsResponse( Map segmentDownloads = ((Map) segment.get(RemoteStoreStats.SubFields.DOWNLOAD)); Map segmentUploads = ((Map) segment.get(RemoteStoreStats.SubFields.UPLOAD)); - if (statsTracker.directoryFileTransferTrackerStats.transferredBytesStarted != 0) { + if (segmentStatsTracker.directoryFileTransferTrackerStats.transferredBytesStarted != 0) { assertEquals( segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.LAST_SYNC_TIMESTAMP), - (int) statsTracker.directoryFileTransferTrackerStats.lastTransferTimestampMs + (int) segmentStatsTracker.directoryFileTransferTrackerStats.lastTransferTimestampMs ); assertEquals( ((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.TOTAL_DOWNLOADS_IN_BYTES)).get( RemoteStoreStats.SubFields.STARTED ), - (int) statsTracker.directoryFileTransferTrackerStats.transferredBytesStarted + (int) segmentStatsTracker.directoryFileTransferTrackerStats.transferredBytesStarted ); assertEquals( ((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.TOTAL_DOWNLOADS_IN_BYTES)).get( RemoteStoreStats.SubFields.SUCCEEDED ), - (int) statsTracker.directoryFileTransferTrackerStats.transferredBytesSucceeded + (int) segmentStatsTracker.directoryFileTransferTrackerStats.transferredBytesSucceeded ); assertEquals( ((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.TOTAL_DOWNLOADS_IN_BYTES)).get( RemoteStoreStats.SubFields.FAILED ), - (int) statsTracker.directoryFileTransferTrackerStats.transferredBytesFailed + (int) segmentStatsTracker.directoryFileTransferTrackerStats.transferredBytesFailed ); assertEquals( ((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.DOWNLOAD_SIZE_IN_BYTES)).get( RemoteStoreStats.SubFields.LAST_SUCCESSFUL ), - (int) statsTracker.directoryFileTransferTrackerStats.lastSuccessfulTransferInBytes + (int) segmentStatsTracker.directoryFileTransferTrackerStats.lastSuccessfulTransferInBytes ); assertEquals( ((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.DOWNLOAD_SIZE_IN_BYTES)).get( RemoteStoreStats.SubFields.MOVING_AVG ), - statsTracker.directoryFileTransferTrackerStats.transferredBytesMovingAverage + segmentStatsTracker.directoryFileTransferTrackerStats.transferredBytesMovingAverage ); assertEquals( ((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.DOWNLOAD_SPEED_IN_BYTES_PER_SEC)).get( RemoteStoreStats.SubFields.MOVING_AVG ), - statsTracker.directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage + segmentStatsTracker.directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage ); } else { assertTrue(segmentDownloads.isEmpty()); } - if (statsTracker.totalUploadsStarted != 0) { + if (segmentStatsTracker.totalUploadsStarted != 0) { assertEquals( segmentUploads.get(RemoteStoreStats.UploadStatsFields.LOCAL_REFRESH_TIMESTAMP), - (int) statsTracker.localRefreshClockTimeMs + (int) segmentStatsTracker.localRefreshClockTimeMs ); assertEquals( segmentUploads.get(RemoteStoreStats.UploadStatsFields.REMOTE_REFRESH_TIMESTAMP), - (int) statsTracker.remoteRefreshClockTimeMs + (int) segmentStatsTracker.remoteRefreshClockTimeMs ); assertEquals( segmentUploads.get(RemoteStoreStats.UploadStatsFields.REFRESH_TIME_LAG_IN_MILLIS), - (int) statsTracker.refreshTimeLagMs + (int) segmentStatsTracker.refreshTimeLagMs ); assertEquals( segmentUploads.get(RemoteStoreStats.UploadStatsFields.REFRESH_LAG), - (int) (statsTracker.localRefreshNumber - statsTracker.remoteRefreshNumber) + (int) (segmentStatsTracker.localRefreshNumber - segmentStatsTracker.remoteRefreshNumber) ); - assertEquals(segmentUploads.get(RemoteStoreStats.UploadStatsFields.BYTES_LAG), (int) statsTracker.bytesLag); + assertEquals(segmentUploads.get(RemoteStoreStats.UploadStatsFields.BYTES_LAG), (int) segmentStatsTracker.bytesLag); assertEquals( segmentUploads.get(RemoteStoreStats.UploadStatsFields.BACKPRESSURE_REJECTION_COUNT), - (int) statsTracker.rejectionCount + (int) segmentStatsTracker.rejectionCount ); assertEquals( segmentUploads.get(RemoteStoreStats.UploadStatsFields.CONSECUTIVE_FAILURE_COUNT), - (int) statsTracker.consecutiveFailuresCount + (int) segmentStatsTracker.consecutiveFailuresCount ); assertEquals( ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS_IN_BYTES)).get( RemoteStoreStats.SubFields.STARTED ), - (int) statsTracker.uploadBytesStarted + (int) segmentStatsTracker.uploadBytesStarted ); assertEquals( ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS_IN_BYTES)).get( RemoteStoreStats.SubFields.SUCCEEDED ), - (int) statsTracker.uploadBytesSucceeded + (int) segmentStatsTracker.uploadBytesSucceeded ); assertEquals( ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS_IN_BYTES)).get( RemoteStoreStats.SubFields.FAILED ), - (int) statsTracker.uploadBytesFailed + (int) segmentStatsTracker.uploadBytesFailed ); assertEquals( ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.REMOTE_REFRESH_SIZE_IN_BYTES)).get( RemoteStoreStats.SubFields.MOVING_AVG ), - statsTracker.uploadBytesMovingAverage + segmentStatsTracker.uploadBytesMovingAverage ); assertEquals( ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.REMOTE_REFRESH_SIZE_IN_BYTES)).get( RemoteStoreStats.SubFields.LAST_SUCCESSFUL ), - (int) statsTracker.lastSuccessfulRemoteRefreshBytes + (int) segmentStatsTracker.lastSuccessfulRemoteRefreshBytes ); assertEquals( ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.UPLOAD_LATENCY_IN_BYTES_PER_SEC)).get( RemoteStoreStats.SubFields.MOVING_AVG ), - statsTracker.uploadBytesPerSecMovingAverage + segmentStatsTracker.uploadBytesPerSecMovingAverage ); assertEquals( ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.TOTAL_SYNCS_TO_REMOTE)).get( RemoteStoreStats.SubFields.STARTED ), - (int) statsTracker.totalUploadsStarted + (int) segmentStatsTracker.totalUploadsStarted ); assertEquals( ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.TOTAL_SYNCS_TO_REMOTE)).get( RemoteStoreStats.SubFields.SUCCEEDED ), - (int) statsTracker.totalUploadsSucceeded + (int) segmentStatsTracker.totalUploadsSucceeded ); assertEquals( ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.TOTAL_SYNCS_TO_REMOTE)).get(RemoteStoreStats.SubFields.FAILED), - (int) statsTracker.totalUploadsFailed + (int) segmentStatsTracker.totalUploadsFailed ); assertEquals( ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.REMOTE_REFRESH_LATENCY_IN_MILLIS)).get( RemoteStoreStats.SubFields.MOVING_AVG ), - statsTracker.uploadTimeMovingAverage + segmentStatsTracker.uploadTimeMovingAverage ); } else { assertTrue(segmentUploads.isEmpty()); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java index 3597a5350e1fb..3f3b1137f4ce5 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java @@ -49,65 +49,46 @@ public void tearDown() throws Exception { } public void testXContentBuilderWithPrimaryShard() throws IOException { - RemoteSegmentTransferTracker.Stats uploadStats = createStatsForNewPrimary(shardId); + RemoteSegmentTransferTracker.Stats segmentStats = createStatsForNewPrimary(shardId); ShardRouting routing = createShardRouting(shardId, true); - RemoteStoreStats stats = new RemoteStoreStats(uploadStats, routing); + RemoteStoreStats stats = new RemoteStoreStats(segmentStats, routing); XContentBuilder builder = XContentFactory.jsonBuilder(); stats.toXContent(builder, EMPTY_PARAMS); Map jsonObject = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2(); - compareStatsResponse(jsonObject, uploadStats, routing); + compareStatsResponse(jsonObject, segmentStats, routing); } public void testXContentBuilderWithReplicaShard() throws IOException { - RemoteSegmentTransferTracker.Stats downloadStats = createStatsForNewReplica(shardId); + RemoteSegmentTransferTracker.Stats segmentStats = createStatsForNewReplica(shardId); ShardRouting routing = createShardRouting(shardId, false); - RemoteStoreStats stats = new RemoteStoreStats(downloadStats, routing); + RemoteStoreStats stats = new RemoteStoreStats(segmentStats, routing); XContentBuilder builder = XContentFactory.jsonBuilder(); stats.toXContent(builder, EMPTY_PARAMS); Map jsonObject = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2(); - compareStatsResponse(jsonObject, downloadStats, routing); + compareStatsResponse(jsonObject, segmentStats, routing); } public void testXContentBuilderWithRemoteStoreRestoredShard() throws IOException { - RemoteSegmentTransferTracker.Stats remotestoreRestoredShardStats = createStatsForRemoteStoreRestoredPrimary(shardId); + RemoteSegmentTransferTracker.Stats segmentStats = createStatsForRemoteStoreRestoredPrimary(shardId); ShardRouting routing = createShardRouting(shardId, true); - RemoteStoreStats stats = new RemoteStoreStats(remotestoreRestoredShardStats, routing); + RemoteStoreStats stats = new RemoteStoreStats(segmentStats, routing); XContentBuilder builder = XContentFactory.jsonBuilder(); stats.toXContent(builder, EMPTY_PARAMS); Map jsonObject = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2(); - compareStatsResponse(jsonObject, remotestoreRestoredShardStats, routing); + compareStatsResponse(jsonObject, segmentStats, routing); } public void testSerializationForPrimaryShard() throws Exception { - RemoteSegmentTransferTracker.Stats primaryShardStats = createStatsForNewPrimary(shardId); - RemoteStoreStats stats = new RemoteStoreStats(primaryShardStats, createShardRouting(shardId, true)); + RemoteSegmentTransferTracker.Stats segmentStats = createStatsForNewPrimary(shardId); + RemoteStoreStats stats = new RemoteStoreStats(segmentStats, createShardRouting(shardId, true)); try (BytesStreamOutput out = new BytesStreamOutput()) { stats.writeTo(out); try (StreamInput in = out.bytes().streamInput()) { - RemoteSegmentTransferTracker.Stats deserializedStats = new RemoteStoreStats(in).getStats(); - assertEquals(stats.getStats().refreshTimeLagMs, deserializedStats.refreshTimeLagMs); - assertEquals(stats.getStats().localRefreshNumber, deserializedStats.localRefreshNumber); - assertEquals(stats.getStats().remoteRefreshNumber, deserializedStats.remoteRefreshNumber); - assertEquals(stats.getStats().uploadBytesStarted, deserializedStats.uploadBytesStarted); - assertEquals(stats.getStats().uploadBytesSucceeded, deserializedStats.uploadBytesSucceeded); - assertEquals(stats.getStats().uploadBytesFailed, deserializedStats.uploadBytesFailed); - assertEquals(stats.getStats().totalUploadsStarted, deserializedStats.totalUploadsStarted); - assertEquals(stats.getStats().totalUploadsFailed, deserializedStats.totalUploadsFailed); - assertEquals(stats.getStats().totalUploadsSucceeded, deserializedStats.totalUploadsSucceeded); - assertEquals(stats.getStats().rejectionCount, deserializedStats.rejectionCount); - assertEquals(stats.getStats().consecutiveFailuresCount, deserializedStats.consecutiveFailuresCount); - assertEquals(stats.getStats().uploadBytesMovingAverage, deserializedStats.uploadBytesMovingAverage, 0); - assertEquals(stats.getStats().uploadBytesPerSecMovingAverage, deserializedStats.uploadBytesPerSecMovingAverage, 0); - assertEquals(stats.getStats().uploadTimeMovingAverage, deserializedStats.uploadTimeMovingAverage, 0); - assertEquals(stats.getStats().bytesLag, deserializedStats.bytesLag); - assertEquals(0, deserializedStats.directoryFileTransferTrackerStats.transferredBytesStarted); - assertEquals(0, deserializedStats.directoryFileTransferTrackerStats.transferredBytesFailed); - assertEquals(0, deserializedStats.directoryFileTransferTrackerStats.transferredBytesSucceeded); - assertEquals(0, deserializedStats.directoryFileTransferTrackerStats.lastSuccessfulTransferInBytes); - assertEquals(0, deserializedStats.directoryFileTransferTrackerStats.lastTransferTimestampMs); + RemoteStoreStats deserializedStats = new RemoteStoreStats(in); + assertEquals(stats.getSegmentStats(), deserializedStats.getSegmentStats()); } } } @@ -118,49 +99,8 @@ public void testSerializationForReplicaShard() throws Exception { try (BytesStreamOutput out = new BytesStreamOutput()) { stats.writeTo(out); try (StreamInput in = out.bytes().streamInput()) { - RemoteSegmentTransferTracker.Stats deserializedStats = new RemoteStoreStats(in).getStats(); - assertEquals(0, deserializedStats.refreshTimeLagMs); - assertEquals(0, deserializedStats.localRefreshNumber); - assertEquals(0, deserializedStats.remoteRefreshNumber); - assertEquals(0, deserializedStats.uploadBytesStarted); - assertEquals(0, deserializedStats.uploadBytesSucceeded); - assertEquals(0, deserializedStats.uploadBytesFailed); - assertEquals(0, deserializedStats.totalUploadsStarted); - assertEquals(0, deserializedStats.totalUploadsFailed); - assertEquals(0, deserializedStats.totalUploadsSucceeded); - assertEquals(0, deserializedStats.rejectionCount); - assertEquals(0, deserializedStats.consecutiveFailuresCount); - assertEquals(0, deserializedStats.bytesLag); - assertEquals( - stats.getStats().directoryFileTransferTrackerStats.transferredBytesStarted, - deserializedStats.directoryFileTransferTrackerStats.transferredBytesStarted - ); - assertEquals( - stats.getStats().directoryFileTransferTrackerStats.transferredBytesFailed, - deserializedStats.directoryFileTransferTrackerStats.transferredBytesFailed - ); - assertEquals( - stats.getStats().directoryFileTransferTrackerStats.transferredBytesSucceeded, - deserializedStats.directoryFileTransferTrackerStats.transferredBytesSucceeded - ); - assertEquals( - stats.getStats().directoryFileTransferTrackerStats.lastSuccessfulTransferInBytes, - deserializedStats.directoryFileTransferTrackerStats.lastSuccessfulTransferInBytes - ); - assertEquals( - stats.getStats().directoryFileTransferTrackerStats.lastTransferTimestampMs, - deserializedStats.directoryFileTransferTrackerStats.lastTransferTimestampMs - ); - assertEquals( - stats.getStats().directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage, - deserializedStats.directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage, - 0 - ); - assertEquals( - stats.getStats().directoryFileTransferTrackerStats.transferredBytesMovingAverage, - deserializedStats.directoryFileTransferTrackerStats.transferredBytesMovingAverage, - 0 - ); + RemoteStoreStats deserializedStats = new RemoteStoreStats(in); + assertEquals(stats.getSegmentStats(), deserializedStats.getSegmentStats()); } } } @@ -171,52 +111,8 @@ public void testSerializationForRemoteStoreRestoredPrimaryShard() throws Excepti try (BytesStreamOutput out = new BytesStreamOutput()) { stats.writeTo(out); try (StreamInput in = out.bytes().streamInput()) { - RemoteSegmentTransferTracker.Stats deserializedStats = new RemoteStoreStats(in).getStats(); - assertEquals(stats.getStats().refreshTimeLagMs, deserializedStats.refreshTimeLagMs); - assertEquals(stats.getStats().localRefreshNumber, deserializedStats.localRefreshNumber); - assertEquals(stats.getStats().remoteRefreshNumber, deserializedStats.remoteRefreshNumber); - assertEquals(stats.getStats().uploadBytesStarted, deserializedStats.uploadBytesStarted); - assertEquals(stats.getStats().uploadBytesSucceeded, deserializedStats.uploadBytesSucceeded); - assertEquals(stats.getStats().uploadBytesFailed, deserializedStats.uploadBytesFailed); - assertEquals(stats.getStats().totalUploadsStarted, deserializedStats.totalUploadsStarted); - assertEquals(stats.getStats().totalUploadsFailed, deserializedStats.totalUploadsFailed); - assertEquals(stats.getStats().totalUploadsSucceeded, deserializedStats.totalUploadsSucceeded); - assertEquals(stats.getStats().rejectionCount, deserializedStats.rejectionCount); - assertEquals(stats.getStats().consecutiveFailuresCount, deserializedStats.consecutiveFailuresCount); - assertEquals(stats.getStats().uploadBytesMovingAverage, deserializedStats.uploadBytesMovingAverage, 0); - assertEquals(stats.getStats().uploadBytesPerSecMovingAverage, deserializedStats.uploadBytesPerSecMovingAverage, 0); - assertEquals(stats.getStats().uploadTimeMovingAverage, deserializedStats.uploadTimeMovingAverage, 0); - assertEquals(stats.getStats().bytesLag, deserializedStats.bytesLag); - assertEquals( - stats.getStats().directoryFileTransferTrackerStats.transferredBytesStarted, - deserializedStats.directoryFileTransferTrackerStats.transferredBytesStarted - ); - assertEquals( - stats.getStats().directoryFileTransferTrackerStats.transferredBytesFailed, - deserializedStats.directoryFileTransferTrackerStats.transferredBytesFailed - ); - assertEquals( - stats.getStats().directoryFileTransferTrackerStats.transferredBytesSucceeded, - deserializedStats.directoryFileTransferTrackerStats.transferredBytesSucceeded - ); - assertEquals( - stats.getStats().directoryFileTransferTrackerStats.lastSuccessfulTransferInBytes, - deserializedStats.directoryFileTransferTrackerStats.lastSuccessfulTransferInBytes - ); - assertEquals( - stats.getStats().directoryFileTransferTrackerStats.lastTransferTimestampMs, - deserializedStats.directoryFileTransferTrackerStats.lastTransferTimestampMs - ); - assertEquals( - stats.getStats().directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage, - deserializedStats.directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage, - 0 - ); - assertEquals( - stats.getStats().directoryFileTransferTrackerStats.transferredBytesMovingAverage, - deserializedStats.directoryFileTransferTrackerStats.transferredBytesMovingAverage, - 0 - ); + RemoteStoreStats deserializedStats = new RemoteStoreStats(in); + assertEquals(stats.getSegmentStats(), deserializedStats.getSegmentStats()); } } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java index aa3e7ab1fb2c7..8b1f838732f8f 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java @@ -28,7 +28,7 @@ import org.opensearch.core.index.Index; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.indices.IndicesService; @@ -52,7 +52,7 @@ public class TransportRemoteStoreStatsActionTests extends IndexShardTestCase { private IndicesService indicesService; - private RemoteRefreshSegmentPressureService pressureService; + private RemoteStorePressureService pressureService; private IndexMetadata remoteStoreIndexMetadata; private TransportService transportService; private ClusterService clusterService; @@ -66,7 +66,7 @@ public void setUp() throws Exception { indicesService = mock(IndicesService.class); IndexService indexService = mock(IndexService.class); clusterService = mock(ClusterService.class); - pressureService = mock(RemoteRefreshSegmentPressureService.class); + pressureService = mock(RemoteStorePressureService.class); MockTransport mockTransport = new MockTransport(); localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); remoteStoreIndexMetadata = IndexMetadata.builder(INDEX.getName()) diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index 73bf93bcf5a9c..3706f8c3848c5 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -77,7 +77,7 @@ import org.opensearch.index.mapper.Mapping; import org.opensearch.index.mapper.MetadataFieldMapper; import org.opensearch.index.mapper.RootObjectMapper; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; @@ -1073,7 +1073,7 @@ public void testHandlePrimaryTermValidationRequestWithDifferentAllocationId() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), - mock(RemoteRefreshSegmentPressureService.class), + mock(RemoteStorePressureService.class), mock(SystemIndices.class) ); action.handlePrimaryTermValidationRequest( @@ -1104,7 +1104,7 @@ public void testHandlePrimaryTermValidationRequestWithOlderPrimaryTerm() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), - mock(RemoteRefreshSegmentPressureService.class), + mock(RemoteStorePressureService.class), mock(SystemIndices.class) ); action.handlePrimaryTermValidationRequest( @@ -1135,7 +1135,7 @@ public void testHandlePrimaryTermValidationRequestSuccess() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), - mock(RemoteRefreshSegmentPressureService.class), + mock(RemoteStorePressureService.class), mock(SystemIndices.class) ); action.handlePrimaryTermValidationRequest( @@ -1177,7 +1177,7 @@ private TransportShardBulkAction createAction() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), - mock(RemoteRefreshSegmentPressureService.class), + mock(RemoteStorePressureService.class), mock(SystemIndices.class) ); } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java index c259129338702..94934d5b4dca6 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java @@ -27,7 +27,7 @@ public class RemoteSegmentTransferTrackerTests extends OpenSearchTestCase { - private RemoteRefreshSegmentPressureSettings pressureSettings; + private RemoteStorePressureSettings pressureSettings; private ClusterService clusterService; @@ -48,11 +48,7 @@ public void setUp() throws Exception { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool ); - pressureSettings = new RemoteRefreshSegmentPressureSettings( - clusterService, - Settings.EMPTY, - mock(RemoteRefreshSegmentPressureService.class) - ); + pressureSettings = new RemoteStorePressureSettings(clusterService, Settings.EMPTY, mock(RemoteStorePressureService.class)); shardId = new ShardId("index", "uuid", 0); directoryFileTransferTracker = new DirectoryFileTransferTracker(); } @@ -407,87 +403,90 @@ public void testComputeBytesLag() { } public void testIsUploadBytesAverageReady() { + int uploadBytesMovingAverageWindowSize = pressureSettings.getUploadBytesMovingAverageWindowSize(); pressureTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, - pressureSettings.getUploadBytesMovingAverageWindowSize(), + uploadBytesMovingAverageWindowSize, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() ); assertFalse(pressureTracker.isUploadBytesAverageReady()); long sum = 0; - for (int i = 1; i < 20; i++) { + for (int i = 1; i < uploadBytesMovingAverageWindowSize; i++) { pressureTracker.addUploadBytes(i); sum += i; assertFalse(pressureTracker.isUploadBytesAverageReady()); assertEquals((double) sum / i, pressureTracker.getUploadBytesAverage(), 0.0d); } - pressureTracker.addUploadBytes(20); - sum += 20; + pressureTracker.addUploadBytes(uploadBytesMovingAverageWindowSize); + sum += uploadBytesMovingAverageWindowSize; assertTrue(pressureTracker.isUploadBytesAverageReady()); - assertEquals((double) sum / 20, pressureTracker.getUploadBytesAverage(), 0.0d); + assertEquals((double) sum / uploadBytesMovingAverageWindowSize, pressureTracker.getUploadBytesAverage(), 0.0d); pressureTracker.addUploadBytes(100); sum = sum + 100 - 1; - assertEquals((double) sum / 20, pressureTracker.getUploadBytesAverage(), 0.0d); + assertEquals((double) sum / uploadBytesMovingAverageWindowSize, pressureTracker.getUploadBytesAverage(), 0.0d); } public void testIsUploadBytesPerSecAverageReady() { + int uploadBytesPerSecMovingAverageWindowSize = pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(); pressureTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + uploadBytesPerSecMovingAverageWindowSize, pressureSettings.getUploadTimeMovingAverageWindowSize() ); assertFalse(pressureTracker.isUploadBytesPerSecAverageReady()); long sum = 0; - for (int i = 1; i < 20; i++) { + for (int i = 1; i < uploadBytesPerSecMovingAverageWindowSize; i++) { pressureTracker.addUploadBytesPerSec(i); sum += i; assertFalse(pressureTracker.isUploadBytesPerSecAverageReady()); assertEquals((double) sum / i, pressureTracker.getUploadBytesPerSecAverage(), 0.0d); } - pressureTracker.addUploadBytesPerSec(20); - sum += 20; + pressureTracker.addUploadBytesPerSec(uploadBytesPerSecMovingAverageWindowSize); + sum += uploadBytesPerSecMovingAverageWindowSize; assertTrue(pressureTracker.isUploadBytesPerSecAverageReady()); - assertEquals((double) sum / 20, pressureTracker.getUploadBytesPerSecAverage(), 0.0d); + assertEquals((double) sum / uploadBytesPerSecMovingAverageWindowSize, pressureTracker.getUploadBytesPerSecAverage(), 0.0d); pressureTracker.addUploadBytesPerSec(100); sum = sum + 100 - 1; - assertEquals((double) sum / 20, pressureTracker.getUploadBytesPerSecAverage(), 0.0d); + assertEquals((double) sum / uploadBytesPerSecMovingAverageWindowSize, pressureTracker.getUploadBytesPerSecAverage(), 0.0d); } public void testIsUploadTimeMsAverageReady() { + int uploadTimeMovingAverageWindowSize = pressureSettings.getUploadTimeMovingAverageWindowSize(); pressureTracker = new RemoteSegmentTransferTracker( shardId, directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + uploadTimeMovingAverageWindowSize ); assertFalse(pressureTracker.isUploadTimeMsAverageReady()); long sum = 0; - for (int i = 1; i < 20; i++) { + for (int i = 1; i < uploadTimeMovingAverageWindowSize; i++) { pressureTracker.addUploadTimeMs(i); sum += i; assertFalse(pressureTracker.isUploadTimeMsAverageReady()); assertEquals((double) sum / i, pressureTracker.getUploadTimeMsAverage(), 0.0d); } - pressureTracker.addUploadTimeMs(20); - sum += 20; + pressureTracker.addUploadTimeMs(uploadTimeMovingAverageWindowSize); + sum += uploadTimeMovingAverageWindowSize; assertTrue(pressureTracker.isUploadTimeMsAverageReady()); - assertEquals((double) sum / 20, pressureTracker.getUploadTimeMsAverage(), 0.0d); + assertEquals((double) sum / uploadTimeMovingAverageWindowSize, pressureTracker.getUploadTimeMsAverage(), 0.0d); pressureTracker.addUploadTimeMs(100); sum = sum + 100 - 1; - assertEquals((double) sum / 20, pressureTracker.getUploadTimeMsAverage(), 0.0d); + assertEquals((double) sum / uploadTimeMovingAverageWindowSize, pressureTracker.getUploadTimeMsAverage(), 0.0d); } public void testIsDownloadBytesAverageReady() { diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java similarity index 90% rename from server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java rename to server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java index 0e23ce4d8b9dc..d79e5ae99b696 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java @@ -31,7 +31,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class RemoteRefreshSegmentPressureServiceTests extends OpenSearchTestCase { +public class RemoteStorePressureServiceTests extends OpenSearchTestCase { private ClusterService clusterService; @@ -39,7 +39,7 @@ public class RemoteRefreshSegmentPressureServiceTests extends OpenSearchTestCase private ShardId shardId; - private RemoteRefreshSegmentPressureService pressureService; + private RemoteStorePressureService pressureService; @Override public void setUp() throws Exception { @@ -60,11 +60,11 @@ public void tearDown() throws Exception { } public void testIsSegmentsUploadBackpressureEnabled() { - pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); assertFalse(pressureService.isSegmentsUploadBackpressureEnabled()); Settings newSettings = Settings.builder() - .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), "true") + .put(RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), "true") .build(); clusterService.getClusterSettings().applySettings(newSettings); @@ -73,21 +73,21 @@ public void testIsSegmentsUploadBackpressureEnabled() { public void testAfterIndexShardCreatedForRemoteBackedIndex() { IndexShard indexShard = createIndexShard(shardId, true); - pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); assertNotNull(pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId())); } public void testAfterIndexShardCreatedForNonRemoteBackedIndex() { IndexShard indexShard = createIndexShard(shardId, false); - pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); assertNull(pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId())); } public void testAfterIndexShardClosed() { IndexShard indexShard = createIndexShard(shardId, true); - pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); assertNotNull(pressureService.getRemoteRefreshSegmentTracker(shardId)); @@ -98,7 +98,7 @@ public void testAfterIndexShardClosed() { public void testValidateSegmentUploadLag() { // Create the pressure tracker IndexShard indexShard = createIndexShard(shardId, true); - pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); RemoteSegmentTransferTracker pressureTracker = pressureService.getRemoteRefreshSegmentTracker(shardId); diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java similarity index 65% rename from server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java rename to server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java index 75b5b946e8bf8..9c5ec69cf6be9 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java @@ -21,7 +21,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -public class RemoteRefreshSegmentPressureSettingsTests extends OpenSearchTestCase { +public class RemoteStorePressureSettingsTests extends OpenSearchTestCase { private ClusterService clusterService; @@ -45,10 +45,10 @@ public void tearDown() throws Exception { } public void testGetDefaultSettings() { - RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( + RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings( clusterService, Settings.EMPTY, - mock(RemoteRefreshSegmentPressureService.class) + mock(RemoteStorePressureService.class) ); // Check remote refresh segment pressure enabled is false @@ -75,18 +75,18 @@ public void testGetDefaultSettings() { public void testGetConfiguredSettings() { Settings settings = Settings.builder() - .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) - .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) - .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) + .put(RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) + .put(RemoteStorePressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) + .put(RemoteStorePressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) + .put(RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) + .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) .build(); - RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( + RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings( clusterService, settings, - mock(RemoteRefreshSegmentPressureService.class) + mock(RemoteStorePressureService.class) ); // Check remote refresh segment pressure enabled is true @@ -112,20 +112,20 @@ public void testGetConfiguredSettings() { } public void testUpdateAfterGetDefaultSettings() { - RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( + RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings( clusterService, Settings.EMPTY, - mock(RemoteRefreshSegmentPressureService.class) + mock(RemoteStorePressureService.class) ); Settings newSettings = Settings.builder() - .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) - .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) - .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) + .put(RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) + .put(RemoteStorePressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) + .put(RemoteStorePressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) + .put(RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) + .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) .build(); clusterService.getClusterSettings().applySettings(newSettings); @@ -153,27 +153,27 @@ public void testUpdateAfterGetDefaultSettings() { public void testUpdateAfterGetConfiguredSettings() { Settings settings = Settings.builder() - .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) - .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) - .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) + .put(RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) + .put(RemoteStorePressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) + .put(RemoteStorePressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) + .put(RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) + .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) .build(); - RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( + RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings( clusterService, settings, - mock(RemoteRefreshSegmentPressureService.class) + mock(RemoteStorePressureService.class) ); Settings newSettings = Settings.builder() - .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 40.0) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 50.0) - .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 111) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 112) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 113) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 114) + .put(RemoteStorePressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 40.0) + .put(RemoteStorePressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 50.0) + .put(RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 111) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 112) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 113) + .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 114) .build(); clusterService.getClusterSettings().applySettings(newSettings); @@ -208,7 +208,7 @@ public void testUpdateTriggeredInRemotePressureServiceOnUpdateSettings() { AtomicInteger updatedUploadBytesPerSecWindowSize = new AtomicInteger(); AtomicInteger updatedUploadTimeWindowSize = new AtomicInteger(); - RemoteRefreshSegmentPressureService pressureService = mock(RemoteRefreshSegmentPressureService.class); + RemoteStorePressureService pressureService = mock(RemoteStorePressureService.class); // Upload bytes doAnswer(invocation -> { @@ -228,15 +228,11 @@ public void testUpdateTriggeredInRemotePressureServiceOnUpdateSettings() { return null; }).when(pressureService).updateUploadTimeMsMovingAverageWindowSize(anyInt()); - RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( - clusterService, - Settings.EMPTY, - pressureService - ); + RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings(clusterService, Settings.EMPTY, pressureService); Settings newSettings = Settings.builder() - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal1) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal2) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal3) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal1) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal2) + .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal3) .build(); clusterService.getClusterSettings().applySettings(newSettings); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 73bfc12bffb8d..0386eed2a6597 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -89,7 +89,6 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.Assertions; -import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.MediaTypeRegistry; @@ -1822,7 +1821,7 @@ public void testShardStatsWithRemoteStoreEnabled() throws IOException { .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) .build() ); - RemoteSegmentTransferTracker remoteRefreshSegmentTracker = shard.getRemoteRefreshSegmentPressureService() + RemoteSegmentTransferTracker remoteRefreshSegmentTracker = shard.getRemoteStorePressureService() .getRemoteRefreshSegmentTracker(shard.shardId); populateSampleRemoteStoreStats(remoteRefreshSegmentTracker); ShardStats shardStats = new ShardStats( diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 2b2b1d744d061..549fca5d81d13 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -27,7 +27,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.engine.NRTReplicationEngineFactory; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.index.store.RemoteDirectory; import org.opensearch.index.store.RemoteSegmentStoreDirectory; @@ -60,7 +60,7 @@ public class RemoteStoreRefreshListenerTests extends IndexShardTestCase { private IndexShard indexShard; private ClusterService clusterService; private RemoteStoreRefreshListener remoteStoreRefreshListener; - private RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; + private RemoteStorePressureService remoteStorePressureService; public void setup(boolean primary, int numberOfDocs) throws IOException { indexShard = newStartedShard( @@ -84,9 +84,9 @@ public void setup(boolean primary, int numberOfDocs) throws IOException { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool ); - remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); - remoteRefreshSegmentPressureService.afterIndexShardCreated(indexShard); - RemoteSegmentTransferTracker tracker = remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + remoteStorePressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); + remoteStorePressureService.afterIndexShardCreated(indexShard); + RemoteSegmentTransferTracker tracker = remoteStorePressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, SegmentReplicationCheckpointPublisher.EMPTY, tracker); } @@ -317,14 +317,14 @@ public void testRefreshSuccessOnFirstAttempt() throws Exception { // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down CountDownLatch successLatch = new CountDownLatch(3); - Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( succeedOnAttempt, refreshCountLatch, successLatch ); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); - RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteStorePressureService pressureService = tuple.v2(); RemoteSegmentTransferTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); assertNoLagAndTotalUploadsFailed(segmentTracker, 0); } @@ -338,14 +338,14 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down CountDownLatch successLatch = new CountDownLatch(3); - Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( succeedOnAttempt, refreshCountLatch, successLatch ); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); - RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteStorePressureService pressureService = tuple.v2(); RemoteSegmentTransferTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); assertNoLagAndTotalUploadsFailed(segmentTracker, 1); } @@ -384,14 +384,14 @@ public void testRefreshSuccessOnThirdAttempt() throws Exception { // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down CountDownLatch successLatch = new CountDownLatch(3); - Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( succeedOnAttempt, refreshCountLatch, successLatch ); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); - RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteStorePressureService pressureService = tuple.v2(); RemoteSegmentTransferTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); assertNoLagAndTotalUploadsFailed(segmentTracker, 2); } @@ -406,9 +406,9 @@ private void assertNoLagAndTotalUploadsFailed(RemoteSegmentTransferTracker segme } public void testTrackerData() throws Exception { - Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh(1); + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh(1); RemoteStoreRefreshListener listener = tuple.v1(); - RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteStorePressureService pressureService = tuple.v2(); RemoteSegmentTransferTracker tracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); assertNoLag(tracker); indexDocs(100, randomIntBetween(100, 200)); @@ -431,13 +431,12 @@ private void assertNoLag(RemoteSegmentTransferTracker tracker) { assertEquals(0, tracker.getTotalUploadsFailed()); } - private Tuple mockIndexShardWithRetryAndScheduleRefresh( - int succeedOnAttempt - ) throws IOException { + private Tuple mockIndexShardWithRetryAndScheduleRefresh(int succeedOnAttempt) + throws IOException { return mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, null, null); } - private Tuple mockIndexShardWithRetryAndScheduleRefresh( + private Tuple mockIndexShardWithRetryAndScheduleRefresh( int succeedOnAttempt, CountDownLatch refreshCountLatch, CountDownLatch successLatch @@ -446,7 +445,7 @@ private Tuple m return mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch, 1, noOpLatch); } - private Tuple mockIndexShardWithRetryAndScheduleRefresh( + private Tuple mockIndexShardWithRetryAndScheduleRefresh( int succeedOnAttempt, CountDownLatch refreshCountLatch, CountDownLatch successLatch, @@ -533,17 +532,14 @@ private Tuple m new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool ); - RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService( - clusterService, - Settings.EMPTY - ); + RemoteStorePressureService remoteStorePressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); when(shard.indexSettings()).thenReturn(indexShard.indexSettings()); when(shard.shardId()).thenReturn(indexShard.shardId()); - remoteRefreshSegmentPressureService.afterIndexShardCreated(shard); - RemoteSegmentTransferTracker tracker = remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + remoteStorePressureService.afterIndexShardCreated(shard); + RemoteSegmentTransferTracker tracker = remoteStorePressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, emptyCheckpointPublisher, tracker); refreshListener.afterRefresh(true); - return Tuple.tuple(refreshListener, remoteRefreshSegmentPressureService); + return Tuple.tuple(refreshListener, remoteStorePressureService); } public static class TestFilterDirectory extends FilterDirectory { diff --git a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 3cc374fa1bfbf..f0db8880dd25b 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -46,7 +46,7 @@ import org.opensearch.core.index.Index; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; @@ -264,7 +264,7 @@ public MockIndexShard createShard( final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, final DiscoveryNode sourceNode, - final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + final RemoteStorePressureService remoteStorePressureService ) throws IOException { failRandomly(); RecoveryState recoveryState = new RecoveryState(shardRouting, targetNode, sourceNode); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index c3784028d5ba8..106fe9098562f 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -177,7 +177,7 @@ import org.opensearch.index.IndexingPressureService; import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.analysis.AnalysisRegistry; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.PrimaryReplicaSyncer; @@ -2125,7 +2125,7 @@ public void onFailure(final Exception e) { ), RetentionLeaseSyncer.EMPTY, SegmentReplicationCheckpointPublisher.EMPTY, - mock(RemoteRefreshSegmentPressureService.class) + mock(RemoteStorePressureService.class) ); final SystemIndices systemIndices = new SystemIndices(emptyMap()); @@ -2176,7 +2176,7 @@ public void onFailure(final Exception e) { mock(ShardStateAction.class), mock(ThreadPool.class) ), - mock(RemoteRefreshSegmentPressureService.class), + mock(RemoteStorePressureService.class), new SystemIndices(emptyMap()) ); actions.put( diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 87427761f6be0..8348584379f9c 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -97,7 +97,7 @@ import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.SourceToParse; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -640,7 +640,7 @@ protected IndexShard newShard( clusterSettings ); Store remoteStore = null; - RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService = null; + RemoteStorePressureService remoteStorePressureService = null; RepositoriesService mockRepoSvc = mock(RepositoriesService.class); if (indexSettings.isRemoteStoreEnabled()) { @@ -655,7 +655,7 @@ protected IndexShard newShard( remoteStore = createRemoteStore(remotePath, routing, indexMetadata); - remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService(clusterService, indexSettings.getSettings()); + remoteStorePressureService = new RemoteStorePressureService(clusterService, indexSettings.getSettings()); BlobStoreRepository repo = createRepository(remotePath); when(mockRepoSvc.repository(any())).thenAnswer(invocationOnMock -> repo); } @@ -695,11 +695,11 @@ protected IndexShard newShard( translogFactorySupplier, checkpointPublisher, remoteStore, - remoteRefreshSegmentPressureService + remoteStorePressureService ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); - if (remoteRefreshSegmentPressureService != null) { - remoteRefreshSegmentPressureService.afterIndexShardCreated(indexShard); + if (remoteStorePressureService != null) { + remoteStorePressureService.afterIndexShardCreated(indexShard); } success = true; } finally {