diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java index 332b0d1698800..c674285cc2853 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java @@ -8,6 +8,10 @@ package org.opensearch.index.remote; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.common.CheckedFunction; +import org.opensearch.common.logging.Loggers; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -17,7 +21,7 @@ import org.opensearch.core.index.shard.ShardId; import java.io.IOException; -import java.util.HashMap; +import java.util.Collection; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -25,6 +29,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.opensearch.index.shard.RemoteStoreRefreshListener.EXCLUDE_FILES; + /** * Keeps track of remote refresh which happens in {@link org.opensearch.index.shard.RemoteStoreRefreshListener}. This consist of multiple critical metrics. * @@ -32,6 +38,8 @@ */ public class RemoteRefreshSegmentTracker { + private final Logger logger; + /** * ShardId for which this instance tracks the remote segment upload metadata. */ @@ -123,14 +131,14 @@ public class RemoteRefreshSegmentTracker { private final Map rejectionCountMap = ConcurrentCollections.newConcurrentMap(); /** - * Map of name to size of the segment files created as part of the most recent refresh. + * Keeps track of segment files and their size in bytes which are part of the most recent refresh. */ - private volatile Map latestLocalFileNameLengthMap; + private final Map latestLocalFileNameLengthMap = ConcurrentCollections.newConcurrentMap(); /** * Set of names of segment files that were uploaded as part of the most recent remote refresh. */ - private final Set latestUploadedFiles = new HashSet<>(); + private final Set latestUploadedFiles = ConcurrentCollections.newConcurrentSet(); /** * Keeps the bytes lag computed so that we do not compute it for every request. @@ -175,6 +183,7 @@ public RemoteRefreshSegmentTracker( int uploadBytesPerSecMovingAverageWindowSize, int uploadTimeMsMovingAverageWindowSize ) { + logger = Loggers.getLogger(getClass(), shardId); this.shardId = shardId; // Both the local refresh time and remote refresh time are set with current time to give consistent view of time lag when it arises. long currentClockTimeMs = System.currentTimeMillis(); @@ -186,8 +195,6 @@ public RemoteRefreshSegmentTracker( uploadBytesMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesMovingAverageWindowSize)); uploadBytesPerSecMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesPerSecMovingAverageWindowSize)); uploadTimeMsMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadTimeMsMovingAverageWindowSize)); - - latestLocalFileNameLengthMap = new HashMap<>(); } ShardId getShardId() { @@ -361,12 +368,43 @@ long getRejectionCount(String rejectionReason) { return rejectionCountMap.get(rejectionReason).get(); } - Map getLatestLocalFileNameLengthMap() { + public Map getLatestLocalFileNameLengthMap() { return latestLocalFileNameLengthMap; } - public void setLatestLocalFileNameLengthMap(Map latestLocalFileNameLengthMap) { - this.latestLocalFileNameLengthMap = latestLocalFileNameLengthMap; + /** + * Updates the latestLocalFileNameLengthMap by adding file name and it's size to the map. The method is given a function as an argument which is used for determining the file size (length in bytes). This method is also provided the collection of segment files which are the latest refresh local segment files. This method also removes the stale segment files from the map that are not part of the input segment files. + * + * @param segmentFiles list of local refreshed segment files + * @param fileSizeFunction function is used to determine the file size in bytes + */ + + /** + * Updates the latestLocalFileNameLengthMap by adding file name and it's size to the map. The method is given a function as an argument which is used for determining the file size (length in bytes). This method is also provided the collection of segment files which are the latest refresh local segment files. This method also removes the stale segment files from the map that are not part of the input segment files. + * + * @param segmentFiles list of local refreshed segment files + * @param fileSizeFunction function is used to determine the file size in bytes + */ + public void updateLatestLocalFileNameLengthMap( + Collection segmentFiles, + CheckedFunction fileSizeFunction + ) { + // Update the map + segmentFiles.stream() + .filter(file -> EXCLUDE_FILES.contains(file) == false) + .filter(file -> latestLocalFileNameLengthMap.containsKey(file) == false || latestLocalFileNameLengthMap.get(file) == 0) + .forEach(file -> { + long fileSize = 0; + try { + fileSize = fileSizeFunction.apply(file); + } catch (IOException e) { + logger.warn(new ParameterizedMessage("Exception while reading the fileLength of file={}", file), e); + } + latestLocalFileNameLengthMap.put(file, fileSize); + }); + Set fileSet = new HashSet<>(segmentFiles); + // Remove keys from the fileSizeMap that do not exist in the latest segment files + latestLocalFileNameLengthMap.entrySet().removeIf(entry -> fileSet.contains(entry.getKey()) == false); computeBytesLag(); } @@ -382,7 +420,7 @@ public void setLatestUploadedFiles(Set files) { } private void computeBytesLag() { - if (latestLocalFileNameLengthMap == null || latestLocalFileNameLengthMap.isEmpty()) { + if (latestLocalFileNameLengthMap.isEmpty()) { return; } Set filesNotYetUploaded = latestLocalFileNameLengthMap.keySet() diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 76c1f0ae088c9..f2f4438af82a9 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3675,6 +3675,9 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro } if (isRemoteStoreEnabled()) { + internalRefreshListener.add( + new RemoteSegmentTrackerListener(this, remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId())) + ); internalRefreshListener.add( new RemoteStoreRefreshListener( this, diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteSegmentTrackerListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteSegmentTrackerListener.java new file mode 100644 index 0000000000000..d9203d3522517 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/shard/RemoteSegmentTrackerListener.java @@ -0,0 +1,96 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.logging.Loggers; +import org.opensearch.index.remote.RemoteRefreshSegmentTracker; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; + +import java.io.IOException; +import java.util.Collection; + +/** + * This listener updates the remote segment tracker with the segment files of the most recent refresh. This is helpful in + * determining the lag and hence applying rejection on lagging remote uploads. + * + * @opensearch.internal + */ +public class RemoteSegmentTrackerListener implements ReferenceManager.RefreshListener { + + private final Logger logger; + private final IndexShard indexShard; + private final RemoteRefreshSegmentTracker segmentTracker; + private final RemoteSegmentStoreDirectory remoteDirectory; + private final Directory storeDirectory; + private long primaryTerm; + + public RemoteSegmentTrackerListener(IndexShard indexShard, RemoteRefreshSegmentTracker segmentTracker) { + this.indexShard = indexShard; + this.segmentTracker = segmentTracker; + logger = Loggers.getLogger(getClass(), indexShard.shardId()); + storeDirectory = indexShard.store().directory(); + remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) + .getDelegate()).getDelegate(); + if (indexShard.routingEntry().primary()) { + try { + this.remoteDirectory.init(); + } catch (IOException e) { + logger.error("Exception while initialising RemoteSegmentStoreDirectory", e); + } + } + this.primaryTerm = remoteDirectory.getPrimaryTermAtInit(); + } + + @Override + public void beforeRefresh() throws IOException {} + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + if (didRefresh + || this.primaryTerm != indexShard.getOperationPrimaryTerm() + || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()) { + updateLocalRefreshTimeAndSeqNo(); + try { + if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { + this.primaryTerm = indexShard.getOperationPrimaryTerm(); + } + try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { + Collection localSegmentsPostRefresh = segmentInfosGatedCloseable.get().files(true); + updateLocalSizeMapAndTracker(localSegmentsPostRefresh); + } + } catch (Throwable t) { + logger.error("Exception in RemoteSegmentTrackerListener.afterRefresh()", t); + } + } + } + + /** + * Updates map of file name to size of the input segment files in the segment tracker. Uses {@code storeDirectory.fileLength(file)} to get the size. + * + * @param segmentFiles list of segment files that are part of the most recent local refresh. + */ + private void updateLocalSizeMapAndTracker(Collection segmentFiles) { + segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength); + } + + /** + * Updates the last refresh time and refresh seq no which is seen by local store. + */ + private void updateLocalRefreshTimeAndSeqNo() { + segmentTracker.updateLocalRefreshClockTimeMs(System.currentTimeMillis()); + segmentTracker.updateLocalRefreshTimeMs(System.nanoTime() / 1_000_000L); + segmentTracker.updateLocalRefreshSeqNo(segmentTracker.getLocalRefreshSeqNo() + 1); + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 3ea8278038ac5..acc515e58f0d0 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -25,13 +25,11 @@ import org.opensearch.common.logging.Loggers; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.UploadListener; -import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.remote.RemoteRefreshSegmentTracker; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.RemoteSegmentStoreDirectory; -import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.translog.Translog; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; @@ -40,7 +38,6 @@ import java.io.IOException; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -69,8 +66,6 @@ public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshL */ private static final int REMOTE_REFRESH_RETRY_MAX_INTERVAL_MILLIS = 10_000; - private static final int INVALID_PRIMARY_TERM = -1; - /** * Exponential back off policy with max retry interval. */ @@ -79,8 +74,7 @@ public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshL REMOTE_REFRESH_RETRY_MAX_INTERVAL_MILLIS ); - // Visible for testing - static final Set EXCLUDE_FILES = Set.of("write.lock"); + public static final Set EXCLUDE_FILES = Set.of("write.lock"); // Visible for testing public static final int LAST_N_METADATA_FILES_TO_KEEP = 10; @@ -91,14 +85,7 @@ public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshL private final Map localSegmentChecksumMap; private long primaryTerm; private volatile Iterator backoffDelayIterator; - - /** - * Keeps track of segment files and their size in bytes which are part of the most recent refresh. - */ - private final Map latestFileNameSizeOnLocalMap = ConcurrentCollections.newConcurrentMap(); - private final SegmentReplicationCheckpointPublisher checkpointPublisher; - private final UploadListener statsListener; public RemoteStoreRefreshListener( @@ -113,17 +100,7 @@ public RemoteStoreRefreshListener( this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) .getDelegate()).getDelegate(); localSegmentChecksumMap = new HashMap<>(); - RemoteSegmentMetadata remoteSegmentMetadata = null; - if (indexShard.routingEntry().primary()) { - try { - remoteSegmentMetadata = this.remoteDirectory.init(); - } catch (IOException e) { - logger.error("Exception while initialising RemoteSegmentStoreDirectory", e); - } - } - // initializing primary term with the primary term of latest metadata in remote store. - // if no metadata is present, this value will be initilized with -1. - this.primaryTerm = remoteSegmentMetadata != null ? remoteSegmentMetadata.getPrimaryTerm() : INVALID_PRIMARY_TERM; + this.primaryTerm = remoteDirectory.getPrimaryTermAtInit(); this.segmentTracker = segmentTracker; resetBackOffDelayIterator(); this.checkpointPublisher = checkpointPublisher; @@ -131,20 +108,20 @@ public RemoteStoreRefreshListener( @Override public void beforeUpload(String file) { // Start tracking the upload bytes started - segmentTracker.addUploadBytesStarted(latestFileNameSizeOnLocalMap.get(file)); + segmentTracker.addUploadBytesStarted(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); } @Override public void onSuccess(String file) { // Track upload success - segmentTracker.addUploadBytesSucceeded(latestFileNameSizeOnLocalMap.get(file)); + segmentTracker.addUploadBytesSucceeded(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); segmentTracker.addToLatestUploadedFiles(file); } @Override public void onFailure(String file) { // Track upload failure - segmentTracker.addUploadBytesFailed(latestFileNameSizeOnLocalMap.get(file)); + segmentTracker.addUploadBytesFailed(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); } }; } @@ -161,10 +138,9 @@ public void beforeRefresh() throws IOException {} */ @Override protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { - if (didRefresh && isRetry == false) { - updateLocalRefreshTimeAndSeqNo(); - } boolean successful; + // The third condition exists for uploading the zero state segments where the refresh has not changed the reader reference, but it + // is important to upload the zero state segments so that the restore does not break. if (this.primaryTerm != indexShard.getOperationPrimaryTerm() || didRefresh || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()) { @@ -293,7 +269,7 @@ private void onSuccessfulSegmentsSync( ReplicationCheckpoint checkpoint ) { // Update latest uploaded segment files name in segment tracker - segmentTracker.setLatestUploadedFiles(latestFileNameSizeOnLocalMap.keySet()); + segmentTracker.setLatestUploadedFiles(segmentTracker.getLatestLocalFileNameLengthMap().keySet()); // Update the remote refresh time and refresh seq no updateRemoteRefreshTimeAndSeqNo(refreshTimeMs, refreshClockTimeMs, refreshSeqNo); // Reset the backoffDelayIterator for the future failures @@ -406,15 +382,6 @@ private String getChecksumOfLocalFile(String file) throws IOException { return localSegmentChecksumMap.get(file); } - /** - * Updates the last refresh time and refresh seq no which is seen by local store. - */ - private void updateLocalRefreshTimeAndSeqNo() { - segmentTracker.updateLocalRefreshClockTimeMs(System.currentTimeMillis()); - segmentTracker.updateLocalRefreshTimeMs(System.nanoTime() / 1_000_000L); - segmentTracker.updateLocalRefreshSeqNo(segmentTracker.getLocalRefreshSeqNo() + 1); - } - /** * Updates the last refresh time and refresh seq no which is seen by remote store. */ @@ -425,33 +392,12 @@ private void updateRemoteRefreshTimeAndSeqNo(long refreshTimeMs, long refreshClo } /** - * Updates map of file name to size of the input segment files. Tries to reuse existing information by caching the size - * data, otherwise uses {@code storeDirectory.fileLength(file)} to get the size. This method also removes from the map - * such files that are not present in the list of segment files given in the input. + * Updates map of file name to size of the input segment files in the segment tracker. Uses {@code storeDirectory.fileLength(file)} to get the size. * - * @param segmentFiles list of segment files for which size needs to be known + * @param segmentFiles list of segment files that are part of the most recent local refresh. */ private void updateLocalSizeMapAndTracker(Collection segmentFiles) { - - // Update the map - segmentFiles.stream() - .filter(file -> !EXCLUDE_FILES.contains(file)) - .filter(file -> !latestFileNameSizeOnLocalMap.containsKey(file) || latestFileNameSizeOnLocalMap.get(file) == 0) - .forEach(file -> { - long fileSize = 0; - try { - fileSize = storeDirectory.fileLength(file); - } catch (IOException e) { - logger.warn(new ParameterizedMessage("Exception while reading the fileLength of file={}", file), e); - } - latestFileNameSizeOnLocalMap.put(file, fileSize); - }); - - Set fileSet = new HashSet<>(segmentFiles); - // Remove keys from the fileSizeMap that do not exist in the latest segment files - latestFileNameSizeOnLocalMap.entrySet().removeIf(entry -> fileSet.contains(entry.getKey()) == false); - // Update the tracker - segmentTracker.setLatestLocalFileNameLengthMap(latestFileNameSizeOnLocalMap); + segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength); } private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesBeforeUpload, long startTimeInNS) { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index d3e8d961337cc..8053f333bcef8 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -70,6 +70,8 @@ * @opensearch.internal */ public final class RemoteSegmentStoreDirectory extends FilterDirectory implements RemoteStoreCommitLevelLockManager { + + private static final int INVALID_PRIMARY_TERM = -1; /** * Each segment file is uploaded with unique suffix. * For example, _0.cfe in local filesystem will be uploaded to remote segment store as _0.cfe__gX7bNIIBrs0AUNsR2yEG @@ -101,6 +103,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement */ private Map segmentsUploadedToRemoteStore; + private Long primaryTermAtInit; + private static final VersionedCodecStreamWrapper metadataStreamWrapper = new VersionedCodecStreamWrapper<>( new RemoteSegmentMetadataHandler(), RemoteSegmentMetadata.CURRENT_VERSION, @@ -141,6 +145,7 @@ public RemoteSegmentStoreDirectory( public RemoteSegmentMetadata init() throws IOException { RemoteSegmentMetadata remoteSegmentMetadata = readLatestMetadataFile(); if (remoteSegmentMetadata != null) { + this.primaryTermAtInit = remoteSegmentMetadata.getPrimaryTerm(); this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(remoteSegmentMetadata.getMetadata()); } else { this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(); @@ -813,6 +818,12 @@ private boolean deleteIfEmpty() throws IOException { return true; } + public long getPrimaryTermAtInit() { + // initializing primary term with the primary term of latest metadata in remote store. + // if no metadata is present, this value will be initialized with -1. + return primaryTermAtInit == null ? INVALID_PRIMARY_TERM : primaryTermAtInit; + } + public void close() throws IOException { deleteStaleSegmentsAsync(0); deleteIfEmpty(); diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java index 5ccacd4048596..1817ded832db4 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java @@ -13,9 +13,9 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.IndexShard; -import org.opensearch.core.index.shard.ShardId; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -129,13 +129,14 @@ public void testValidateSegmentUploadLag() { avg = (double) sum.get() / 20; Map nameSizeMap = new HashMap<>(); nameSizeMap.put("a", (long) (12 * avg)); - pressureTracker.setLatestLocalFileNameLengthMap(nameSizeMap); + pressureTracker.updateLatestLocalFileNameLengthMap(nameSizeMap.keySet(), nameSizeMap::get); e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); assertTrue(e.getMessage().contains("bytes_lag:114 dynamic_bytes_lag_threshold:95.0")); - nameSizeMap.put("a", (long) (2 * avg)); - pressureTracker.setLatestLocalFileNameLengthMap(nameSizeMap); + nameSizeMap.clear(); + nameSizeMap.put("b", (long) (2 * avg)); + pressureTracker.updateLatestLocalFileNameLengthMap(nameSizeMap.keySet(), nameSizeMap::get); pressureService.validateSegmentsUploadLag(shardId); // 3. Consecutive failures more than the limit diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java index badfeb0d67c05..a24bcde177e46 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java @@ -10,9 +10,9 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.io.stream.BytesStreamOutput; -import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.index.shard.ShardId; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -306,14 +306,14 @@ public void testComputeBytesLag() { Map fileSizeMap = new HashMap<>(); fileSizeMap.put("a", 100L); fileSizeMap.put("b", 105L); - pressureTracker.setLatestLocalFileNameLengthMap(fileSizeMap); + pressureTracker.updateLatestLocalFileNameLengthMap(fileSizeMap.keySet(), fileSizeMap::get); assertEquals(205L, pressureTracker.getBytesLag()); pressureTracker.addToLatestUploadedFiles("a"); assertEquals(105L, pressureTracker.getBytesLag()); fileSizeMap.put("c", 115L); - pressureTracker.setLatestLocalFileNameLengthMap(fileSizeMap); + pressureTracker.updateLatestLocalFileNameLengthMap(fileSizeMap.keySet(), fileSizeMap::get); assertEquals(220L, pressureTracker.getBytesLag()); pressureTracker.addToLatestUploadedFiles("b"); @@ -406,7 +406,7 @@ public void testIsUploadTimeMsAverageReady() { /** * Tests whether RemoteRefreshSegmentTracker.Stats object generated correctly from RemoteRefreshSegmentTracker. - * */ + */ public void testStatsObjectCreation() { pressureTracker = constructTracker(); RemoteRefreshSegmentTracker.Stats pressureTrackerStats = pressureTracker.stats(); @@ -431,7 +431,7 @@ public void testStatsObjectCreation() { /** * Tests whether RemoteRefreshSegmentTracker.Stats object serialize and deserialize is working fine. * This comes into play during internode data transfer. - * */ + */ public void testStatsObjectCreationViaStream() throws IOException { pressureTracker = constructTracker(); RemoteRefreshSegmentTracker.Stats pressureTrackerStats = pressureTracker.stats(); diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 6f38f080e5035..44baaaf1ba562 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -51,6 +51,7 @@ public class RemoteStoreRefreshListenerTests extends IndexShardTestCase { private IndexShard indexShard; private ClusterService clusterService; private RemoteStoreRefreshListener remoteStoreRefreshListener; + private RemoteSegmentTrackerListener remoteSegmentTrackerListener; private RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; public void setup(boolean primary, int numberOfDocs) throws IOException { @@ -75,11 +76,9 @@ public void setup(boolean primary, int numberOfDocs) throws IOException { ); remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); remoteRefreshSegmentPressureService.afterIndexShardCreated(indexShard); - remoteStoreRefreshListener = new RemoteStoreRefreshListener( - indexShard, - SegmentReplicationCheckpointPublisher.EMPTY, - remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()) - ); + RemoteRefreshSegmentTracker tracker = remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + remoteSegmentTrackerListener = new RemoteSegmentTrackerListener(indexShard, tracker); + remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, SegmentReplicationCheckpointPublisher.EMPTY, tracker); } private void indexDocs(int startDocId, int numberOfDocs) throws IOException { @@ -179,6 +178,7 @@ public void testAfterMultipleCommits() throws IOException { public void testReplica() throws IOException { setup(false, 3); + remoteSegmentTrackerListener.afterRefresh(true); remoteStoreRefreshListener.afterRefresh(true); try (Store remoteStore = indexShard.remoteStore()) { @@ -191,6 +191,7 @@ public void testReplica() throws IOException { public void testReplicaPromotion() throws IOException, InterruptedException { setup(false, 3); + remoteSegmentTrackerListener.afterRefresh(true); remoteStoreRefreshListener.afterRefresh(true); RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = @@ -226,6 +227,7 @@ public void onFailure(Exception e) { indexDocs(4, 4); indexShard.refresh("test"); + remoteSegmentTrackerListener.afterRefresh(true); remoteStoreRefreshListener.afterRefresh(true); verifyUploadedSegments(remoteSegmentStoreDirectory); @@ -466,11 +468,10 @@ private Tuple m when(shard.indexSettings()).thenReturn(indexShard.indexSettings()); when(shard.shardId()).thenReturn(indexShard.shardId()); remoteRefreshSegmentPressureService.afterIndexShardCreated(shard); - RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener( - shard, - emptyCheckpointPublisher, - remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()) - ); + RemoteRefreshSegmentTracker tracker = remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + RemoteSegmentTrackerListener trackerListener = new RemoteSegmentTrackerListener(indexShard, tracker); + RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, emptyCheckpointPublisher, tracker); + trackerListener.afterRefresh(true); refreshListener.afterRefresh(true); return Tuple.tuple(refreshListener, remoteRefreshSegmentPressureService); }