diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java index cd5d461584f0f..16bcb0a7721bc 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java @@ -8,6 +8,10 @@ package org.opensearch.index.remote; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.common.CheckedFunction; +import org.opensearch.common.logging.Loggers; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -18,7 +22,8 @@ import org.opensearch.index.store.DirectoryFileTransferTracker; import java.io.IOException; -import java.util.HashMap; +import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -26,6 +31,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static org.opensearch.index.shard.RemoteStoreRefreshListener.EXCLUDE_FILES; + /** * Keeps track of remote refresh which happens in {@link org.opensearch.index.shard.RemoteStoreRefreshListener}. This consist of multiple critical metrics. * @@ -33,6 +40,8 @@ */ public class RemoteSegmentTransferTracker { + private final Logger logger; + /** * ShardId for which this instance tracks the remote segment upload metadata. */ @@ -124,14 +133,15 @@ public class RemoteSegmentTransferTracker { private final Map rejectionCountMap = ConcurrentCollections.newConcurrentMap(); /** - * Map of name to size of the segment files created as part of the most recent refresh. + * Keeps track of segment files and their size in bytes which are part of the most recent refresh. */ - private volatile Map latestLocalFileNameLengthMap; + private final Map latestLocalFileNameLengthMap = ConcurrentCollections.newConcurrentMap(); /** - * Set of names of segment files that were uploaded as part of the most recent remote refresh. + * This contains the files from the last successful remote refresh and ongoing uploads. This gets reset to just the + * last successful remote refresh state on successful remote refresh. */ - private final Set latestUploadedFiles = new HashSet<>(); + private final Set latestUploadedFiles = ConcurrentCollections.newConcurrentSet(); /** * Keeps the bytes lag computed so that we do not compute it for every request. @@ -182,6 +192,7 @@ public RemoteSegmentTransferTracker( int uploadBytesPerSecMovingAverageWindowSize, int uploadTimeMsMovingAverageWindowSize ) { + logger = Loggers.getLogger(getClass(), shardId); this.shardId = shardId; // Both the local refresh time and remote refresh time are set with current time to give consistent view of time lag when it arises. long currentClockTimeMs = System.currentTimeMillis(); @@ -193,8 +204,6 @@ public RemoteSegmentTransferTracker( uploadBytesMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesMovingAverageWindowSize)); uploadBytesPerSecMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesPerSecMovingAverageWindowSize)); uploadTimeMsMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadTimeMsMovingAverageWindowSize)); - - latestLocalFileNameLengthMap = new HashMap<>(); this.directoryFileTransferTracker = directoryFileTransferTracker; } @@ -206,7 +215,8 @@ public long getLocalRefreshSeqNo() { return localRefreshSeqNo; } - public void updateLocalRefreshSeqNo(long localRefreshSeqNo) { + // Visible for testing + void updateLocalRefreshSeqNo(long localRefreshSeqNo) { assert localRefreshSeqNo >= this.localRefreshSeqNo : "newLocalRefreshSeqNo=" + localRefreshSeqNo + " < " @@ -224,7 +234,17 @@ public long getLocalRefreshClockTimeMs() { return localRefreshClockTimeMs; } - public void updateLocalRefreshTimeMs(long localRefreshTimeMs) { + /** + * Updates the last refresh time and refresh seq no which is seen by local store. + */ + public void updateLocalRefreshTimeAndSeqNo() { + updateLocalRefreshClockTimeMs(System.currentTimeMillis()); + updateLocalRefreshTimeMs(System.nanoTime() / 1_000_000L); + updateLocalRefreshSeqNo(getLocalRefreshSeqNo() + 1); + } + + // Visible for testing + void updateLocalRefreshTimeMs(long localRefreshTimeMs) { assert localRefreshTimeMs >= this.localRefreshTimeMs : "newLocalRefreshTimeMs=" + localRefreshTimeMs + " < " @@ -234,7 +254,7 @@ public void updateLocalRefreshTimeMs(long localRefreshTimeMs) { computeTimeMsLag(); } - public void updateLocalRefreshClockTimeMs(long localRefreshClockTimeMs) { + private void updateLocalRefreshClockTimeMs(long localRefreshClockTimeMs) { this.localRefreshClockTimeMs = localRefreshClockTimeMs; } @@ -369,12 +389,36 @@ long getRejectionCount(String rejectionReason) { return rejectionCountMap.get(rejectionReason).get(); } - Map getLatestLocalFileNameLengthMap() { - return latestLocalFileNameLengthMap; + public Map getLatestLocalFileNameLengthMap() { + return Collections.unmodifiableMap(latestLocalFileNameLengthMap); } - public void setLatestLocalFileNameLengthMap(Map latestLocalFileNameLengthMap) { - this.latestLocalFileNameLengthMap = latestLocalFileNameLengthMap; + /** + * Updates the latestLocalFileNameLengthMap by adding file name and it's size to the map. The method is given a function as an argument which is used for determining the file size (length in bytes). This method is also provided the collection of segment files which are the latest refresh local segment files. This method also removes the stale segment files from the map that are not part of the input segment files. + * + * @param segmentFiles list of local refreshed segment files + * @param fileSizeFunction function is used to determine the file size in bytes + */ + public void updateLatestLocalFileNameLengthMap( + Collection segmentFiles, + CheckedFunction fileSizeFunction + ) { + // Update the map + segmentFiles.stream() + .filter(file -> EXCLUDE_FILES.contains(file) == false) + .filter(file -> latestLocalFileNameLengthMap.containsKey(file) == false || latestLocalFileNameLengthMap.get(file) == 0) + .forEach(file -> { + long fileSize = 0; + try { + fileSize = fileSizeFunction.apply(file); + } catch (IOException e) { + logger.warn(new ParameterizedMessage("Exception while reading the fileLength of file={}", file), e); + } + latestLocalFileNameLengthMap.put(file, fileSize); + }); + Set fileSet = new HashSet<>(segmentFiles); + // Remove keys from the fileSizeMap that do not exist in the latest segment files + latestLocalFileNameLengthMap.entrySet().removeIf(entry -> fileSet.contains(entry.getKey()) == false); computeBytesLag(); } @@ -390,7 +434,7 @@ public void setLatestUploadedFiles(Set files) { } private void computeBytesLag() { - if (latestLocalFileNameLengthMap == null || latestLocalFileNameLengthMap.isEmpty()) { + if (latestLocalFileNameLengthMap.isEmpty()) { return; } Set filesNotYetUploaded = latestLocalFileNameLengthMap.keySet() diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index 357b6c2eaa456..85d744e58265f 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -39,7 +39,7 @@ public void beforeRefresh() throws IOException { } @Override - protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode() diff --git a/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java index 10e3e04033da3..9bc105bf13f0a 100644 --- a/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java @@ -15,8 +15,10 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Objects; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -27,11 +29,13 @@ public abstract class CloseableRetryableRefreshListener implements ReferenceManager.RefreshListener, Closeable { /** - * Total permits = 1 ensures that there is only single instance of performAfterRefresh that is running at a time. + * Total permits = 1 ensures that there is only single instance of runAfterRefreshWithPermit that is running at a time. * In case there are use cases where concurrency is required, the total permit variable can be put inside the ctor. */ private static final int TOTAL_PERMITS = 1; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final Semaphore semaphore = new Semaphore(TOTAL_PERMITS); private final ThreadPool threadPool; @@ -46,57 +50,80 @@ public CloseableRetryableRefreshListener() { } public CloseableRetryableRefreshListener(ThreadPool threadPool) { + assert Objects.nonNull(threadPool); this.threadPool = threadPool; } @Override public final void afterRefresh(boolean didRefresh) throws IOException { - boolean successful; - boolean permitAcquired = semaphore.tryAcquire(); - try { - successful = permitAcquired && performAfterRefresh(didRefresh, false); - } finally { - if (permitAcquired) { - semaphore.release(); - } + if (closed.get()) { + return; } - scheduleRetry(successful, didRefresh, permitAcquired); + runAfterRefreshExactlyOnce(didRefresh); + runAfterRefreshWithPermit(didRefresh, () -> {}); + } + + /** + * The code in this method is executed exactly once. This is done for running non-idempotent function which needs to be + * executed immediately when afterRefresh method is invoked. + * + * @param didRefresh if the refresh did open a new reference then didRefresh will be true + */ + protected void runAfterRefreshExactlyOnce(boolean didRefresh) { + // No-op: The implementor would be providing the code } + /** + * The implementor has the option to override the retry thread pool name. This will be used for scheduling the retries. + * The method would be invoked each time when a retry is required. By default, it uses the same threadpool for retry. + * + * @return the name of the retry thread pool. + */ protected String getRetryThreadPoolName() { - return null; + return ThreadPool.Names.SAME; } + /** + * By default, the retry interval is returned as 1s. The implementor has the option to override the retry interval. + * This is used for scheduling the next retry. The method would be invoked each time when a retry is required. The + * implementor can choose any retry strategy and return the next retry interval accordingly. + * + * @return the interval for the next retry. + */ protected TimeValue getNextRetryInterval() { - return null; + return TimeValue.timeValueSeconds(1); } - private void scheduleRetry(TimeValue interval, String retryThreadPoolName, boolean didRefresh, boolean isRetry) { - if (this.threadPool == null - || interval == null - || retryThreadPoolName == null - || ThreadPool.THREAD_POOL_TYPES.containsKey(retryThreadPoolName) == false - || interval == TimeValue.MINUS_ONE - || retryScheduled.compareAndSet(false, true) == false) { + /** + * This method is used to schedule retry which internally calls the performAfterRefresh method under the available permits. + * + * @param interval interval after which the retry would be invoked + * @param retryThreadPoolName the thread pool name to be used for retry + * @param didRefresh if didRefresh is true + */ + private void scheduleRetry(TimeValue interval, String retryThreadPoolName, boolean didRefresh) { + // If the underlying listener has closed, then we do not allow even the retry to be scheduled + if (closed.get() || isRetryEnabled() == false) { + return; + } + + assert Objects.nonNull(interval) && ThreadPool.THREAD_POOL_TYPES.containsKey(retryThreadPoolName); + + // If the retryScheduled is already true, then we return from here itself. If not, then we proceed with scheduling + // the retry. + if (retryScheduled.getAndSet(true)) { return; } + boolean scheduled = false; try { - this.threadPool.schedule(() -> { - boolean successful; - boolean permitAcquired = semaphore.tryAcquire(); - try { - successful = permitAcquired && performAfterRefresh(didRefresh, isRetry); - } finally { - if (permitAcquired) { - semaphore.release(); - } - retryScheduled.set(false); - } - scheduleRetry(successful, didRefresh, isRetry || permitAcquired); - }, interval, retryThreadPoolName); + this.threadPool.schedule( + () -> runAfterRefreshWithPermit(didRefresh, () -> retryScheduled.set(false)), + interval, + retryThreadPoolName + ); scheduled = true; - getLogger().info("Scheduled retry with didRefresh={} isRetry={}", didRefresh, isRetry); + getLogger().info("Scheduled retry with didRefresh={}", didRefresh); } finally { if (scheduled == false) { retryScheduled.set(false); @@ -104,40 +131,82 @@ private void scheduleRetry(TimeValue interval, String retryThreadPoolName, boole } } + /** + * This returns if the retry is enabled or not. By default, the retries are not enabled. + * @return true if retry is enabled. + */ + protected boolean isRetryEnabled() { + return false; + } + + /** + * Runs the performAfterRefresh method under permit. If there are no permits available, then it is no-op. It also hits + * the scheduleRetry method with the result value of the performAfterRefresh method invocation. + * The synchronised block ensures that if there is a retry or afterRefresh waiting, then it waits until the previous + * execution finishes. + */ + private synchronized void runAfterRefreshWithPermit(boolean didRefresh, Runnable runFinally) { + if (closed.get()) { + return; + } + boolean successful; + boolean permitAcquired = semaphore.tryAcquire(); + try { + successful = permitAcquired && performAfterRefreshWithPermit(didRefresh); + } finally { + if (permitAcquired) { + semaphore.release(); + } + runFinally.run(); + } + scheduleRetry(successful, didRefresh); + } + /** * Schedules the retry based on the {@code afterRefreshSuccessful} value. * * @param afterRefreshSuccessful is sent true if the performAfterRefresh(..) is successful. * @param didRefresh if the refresh did open a new reference then didRefresh will be true - * @param isRetry if this is a failure or permit was not acquired. */ - private void scheduleRetry(boolean afterRefreshSuccessful, boolean didRefresh, boolean isRetry) { + private void scheduleRetry(boolean afterRefreshSuccessful, boolean didRefresh) { if (afterRefreshSuccessful == false) { - scheduleRetry(getNextRetryInterval(), getRetryThreadPoolName(), didRefresh, isRetry); + scheduleRetry(getNextRetryInterval(), getRetryThreadPoolName(), didRefresh); } } /** - * This method needs to be overridden and be provided with what needs to be run on after refresh. + * This method needs to be overridden and be provided with what needs to be run on after refresh with permits. * * @param didRefresh true if the refresh opened a new reference - * @param isRetry true if this is a retry attempt * @return true if a retry is needed else false. */ - protected abstract boolean performAfterRefresh(boolean didRefresh, boolean isRetry); + protected abstract boolean performAfterRefreshWithPermit(boolean didRefresh); @Override public final void close() throws IOException { try { if (semaphore.tryAcquire(TOTAL_PERMITS, 10, TimeUnit.MINUTES)) { - assert semaphore.availablePermits() == 0; + boolean result = closed.compareAndSet(false, true); + assert result && semaphore.availablePermits() == 0; + getLogger().info("Closed"); } else { - throw new RuntimeException("timeout while closing gated refresh listener"); + throw new TimeoutException("timeout while closing gated refresh listener"); } - } catch (InterruptedException e) { - throw new RuntimeException(e); + } catch (InterruptedException | TimeoutException e) { + throw new RuntimeException("Failed to close the closeable retryable listener", e); } } protected abstract Logger getLogger(); + + // Visible for testing + + /** + * Returns if the retry is scheduled or not. + * + * @return boolean as mentioned above. + */ + boolean getRetryScheduledStatus() { + return retryScheduled.get(); + } } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index d56054dd1c42b..1357a77071786 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -25,7 +25,6 @@ import org.opensearch.common.logging.Loggers; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.UploadListener; -import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.remote.RemoteSegmentTransferTracker; @@ -40,7 +39,6 @@ import java.io.IOException; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -79,8 +77,7 @@ public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshL REMOTE_REFRESH_RETRY_MAX_INTERVAL_MILLIS ); - // Visible for testing - static final Set EXCLUDE_FILES = Set.of("write.lock"); + public static final Set EXCLUDE_FILES = Set.of("write.lock"); // Visible for testing public static final int LAST_N_METADATA_FILES_TO_KEEP = 10; @@ -91,14 +88,7 @@ public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshL private final Map localSegmentChecksumMap; private long primaryTerm; private volatile Iterator backoffDelayIterator; - - /** - * Keeps track of segment files and their size in bytes which are part of the most recent refresh. - */ - private final Map latestFileNameSizeOnLocalMap = ConcurrentCollections.newConcurrentMap(); - private final SegmentReplicationCheckpointPublisher checkpointPublisher; - private final UploadListener statsListener; public RemoteStoreRefreshListener( @@ -122,7 +112,7 @@ public RemoteStoreRefreshListener( } } // initializing primary term with the primary term of latest metadata in remote store. - // if no metadata is present, this value will be initilized with -1. + // if no metadata is present, this value will be initialized with -1. this.primaryTerm = remoteSegmentMetadata != null ? remoteSegmentMetadata.getPrimaryTerm() : INVALID_PRIMARY_TERM; this.segmentTracker = segmentTracker; resetBackOffDelayIterator(); @@ -131,20 +121,20 @@ public RemoteStoreRefreshListener( @Override public void beforeUpload(String file) { // Start tracking the upload bytes started - segmentTracker.addUploadBytesStarted(latestFileNameSizeOnLocalMap.get(file)); + segmentTracker.addUploadBytesStarted(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); } @Override public void onSuccess(String file) { // Track upload success - segmentTracker.addUploadBytesSucceeded(latestFileNameSizeOnLocalMap.get(file)); + segmentTracker.addUploadBytesSucceeded(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); segmentTracker.addToLatestUploadedFiles(file); } @Override public void onFailure(String file) { // Track upload failure - segmentTracker.addUploadBytesFailed(latestFileNameSizeOnLocalMap.get(file)); + segmentTracker.addUploadBytesFailed(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); } }; } @@ -152,6 +142,25 @@ public void onFailure(String file) { @Override public void beforeRefresh() throws IOException {} + @Override + protected void runAfterRefreshExactlyOnce(boolean didRefresh) { + if (shouldSync(didRefresh)) { + segmentTracker.updateLocalRefreshTimeAndSeqNo(); + try { + if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { + this.primaryTerm = indexShard.getOperationPrimaryTerm(); + this.remoteDirectory.init(); + } + try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { + Collection localSegmentsPostRefresh = segmentInfosGatedCloseable.get().files(true); + updateLocalSizeMapAndTracker(localSegmentsPostRefresh); + } + } catch (Throwable t) { + logger.error("Exception in runAfterRefreshExactlyOnce() method", t); + } + } + } + /** * Upload new segment files created as part of the last refresh to the remote segment store. * This method also uploads remote_segments_metadata file which contains metadata of each segment file uploaded. @@ -160,14 +169,11 @@ public void beforeRefresh() throws IOException {} * @return true if the method runs successfully. */ @Override - protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { - if (didRefresh && isRetry == false) { - updateLocalRefreshTimeAndSeqNo(); - } + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { boolean successful; - if (this.primaryTerm != indexShard.getOperationPrimaryTerm() - || didRefresh - || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()) { + // The third condition exists for uploading the zero state segments where the refresh has not changed the reader reference, but it + // is important to upload the zero state segments so that the restore does not break. + if (shouldSync(didRefresh)) { successful = syncSegments(); } else { successful = true; @@ -175,7 +181,13 @@ protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { return successful; } - private synchronized boolean syncSegments() { + private boolean shouldSync(boolean didRefresh) { + return this.primaryTerm != indexShard.getOperationPrimaryTerm() + || didRefresh + || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty(); + } + + private boolean syncSegments() { if (indexShard.getReplicationTracker().isPrimaryMode() == false || indexShard.state() == IndexShardState.CLOSED) { logger.info( "Skipped syncing segments with primaryMode={} indexShardState={}", @@ -192,10 +204,6 @@ private synchronized boolean syncSegments() { final AtomicBoolean successful = new AtomicBoolean(false); try { - if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { - this.primaryTerm = indexShard.getOperationPrimaryTerm(); - this.remoteDirectory.init(); - } try { // if a new segments_N file is present in local that is not uploaded to remote store yet, it // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. @@ -296,7 +304,7 @@ private void onSuccessfulSegmentsSync( ReplicationCheckpoint checkpoint ) { // Update latest uploaded segment files name in segment tracker - segmentTracker.setLatestUploadedFiles(latestFileNameSizeOnLocalMap.keySet()); + segmentTracker.setLatestUploadedFiles(segmentTracker.getLatestLocalFileNameLengthMap().keySet()); // Update the remote refresh time and refresh seq no updateRemoteRefreshTimeAndSeqNo(refreshTimeMs, refreshClockTimeMs, refreshSeqNo); // Reset the backoffDelayIterator for the future failures @@ -410,15 +418,6 @@ private String getChecksumOfLocalFile(String file) throws IOException { return localSegmentChecksumMap.get(file); } - /** - * Updates the last refresh time and refresh seq no which is seen by local store. - */ - private void updateLocalRefreshTimeAndSeqNo() { - segmentTracker.updateLocalRefreshClockTimeMs(System.currentTimeMillis()); - segmentTracker.updateLocalRefreshTimeMs(System.nanoTime() / 1_000_000L); - segmentTracker.updateLocalRefreshSeqNo(segmentTracker.getLocalRefreshSeqNo() + 1); - } - /** * Updates the last refresh time and refresh seq no which is seen by remote store. */ @@ -429,33 +428,12 @@ private void updateRemoteRefreshTimeAndSeqNo(long refreshTimeMs, long refreshClo } /** - * Updates map of file name to size of the input segment files. Tries to reuse existing information by caching the size - * data, otherwise uses {@code storeDirectory.fileLength(file)} to get the size. This method also removes from the map - * such files that are not present in the list of segment files given in the input. + * Updates map of file name to size of the input segment files in the segment tracker. Uses {@code storeDirectory.fileLength(file)} to get the size. * - * @param segmentFiles list of segment files for which size needs to be known + * @param segmentFiles list of segment files that are part of the most recent local refresh. */ private void updateLocalSizeMapAndTracker(Collection segmentFiles) { - - // Update the map - segmentFiles.stream() - .filter(file -> !EXCLUDE_FILES.contains(file)) - .filter(file -> !latestFileNameSizeOnLocalMap.containsKey(file) || latestFileNameSizeOnLocalMap.get(file) == 0) - .forEach(file -> { - long fileSize = 0; - try { - fileSize = storeDirectory.fileLength(file); - } catch (IOException e) { - logger.warn(new ParameterizedMessage("Exception while reading the fileLength of file={}", file), e); - } - latestFileNameSizeOnLocalMap.put(file, fileSize); - }); - - Set fileSet = new HashSet<>(segmentFiles); - // Remove keys from the fileSizeMap that do not exist in the latest segment files - latestFileNameSizeOnLocalMap.entrySet().removeIf(entry -> fileSet.contains(entry.getKey()) == false); - // Update the tracker - segmentTracker.setLatestLocalFileNameLengthMap(latestFileNameSizeOnLocalMap); + segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength); } private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesBeforeUpload, long startTimeInNS) { @@ -475,4 +453,9 @@ private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesB protected Logger getLogger() { return logger; } + + @Override + protected boolean isRetryEnabled() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 9ac5ebb94a5ca..a0d4128301006 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -71,9 +71,11 @@ * caller will be accessing segment files in the same way as {@code FSDirectory}. Apart from storing actual segment files, * remote segment store also keeps track of refresh checkpoints as metadata in a separate path which is handled by * another instance of {@code RemoteDirectory}. + * * @opensearch.internal */ public final class RemoteSegmentStoreDirectory extends FilterDirectory implements RemoteStoreCommitLevelLockManager { + /** * Each segment file is uploaded with unique suffix. * For example, _0.cfe in local filesystem will be uploaded to remote segment store as _0.cfe__gX7bNIIBrs0AUNsR2yEG @@ -140,6 +142,7 @@ public RemoteSegmentStoreDirectory( * As this cache is specific to an instance of RemoteSegmentStoreDirectory, it is possible that cache becomes stale * if another instance of RemoteSegmentStoreDirectory is used to upload/delete segment files. * It is caller's responsibility to call init() again to ensure that cache is properly updated. + * * @throws IOException if there were any failures in reading the metadata file */ public RemoteSegmentMetadata init() throws IOException { @@ -157,6 +160,7 @@ public RemoteSegmentMetadata init() throws IOException { * remote segment store. * this is currently used to restore snapshots, where we want to copy segment files from a given commit. * TODO: check if we can return read only RemoteSegmentStoreDirectory object from here. + * * @throws IOException if there were any failures in reading the metadata file */ public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long commitGeneration) throws IOException { @@ -177,8 +181,9 @@ public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long c * Refresh metadata files keep track of active segments for the shard at the time of refresh. * In order to get the list of segment files uploaded to the remote segment store, we need to read the latest metadata file. * Each metadata file contains a map where - * Key is - Segment local filename and - * Value is - local filename::uploaded filename::checksum + * Key is - Segment local filename and + * Value is - local filename::uploaded filename::checksum + * * @return Map of segment filename to uploaded filename with checksum * @throws IOException if there were any failures in reading the metadata file */ @@ -293,7 +298,7 @@ public void setWrittenByMajor(int writtenByMajor) { * Contains utility methods that provide various parts of metadata filename along with comparator * Each metadata filename is of format: PREFIX__PrimaryTerm__Generation__UUID */ - static class MetadataFilenameUtils { + public static class MetadataFilenameUtils { public static final String SEPARATOR = "__"; public static final String METADATA_PREFIX = "metadata"; @@ -342,6 +347,7 @@ static long getGeneration(String[] filenameTokens) { * Any segment file that is uploaded without corresponding metadata file will not be visible as part of listAll(). * We chose not to return cache entries for listAll as cache can have entries for stale segments as well. * Even if we plan to delete stale segments from remote segment store, it will be a periodic operation. + * * @return segment filenames stored in remote segment store * @throws IOException if there were any failures in reading the metadata file */ @@ -352,6 +358,7 @@ public String[] listAll() throws IOException { /** * Delete segment file from remote segment store. + * * @param name the name of an existing segment file in local filesystem. * @throws IOException if the file exists but could not be deleted. */ @@ -366,8 +373,9 @@ public void deleteFile(String name) throws IOException { /** * Returns the byte length of a segment file in the remote segment store. + * * @param name the name of an existing segment file in local filesystem. - * @throws IOException in case of I/O error + * @throws IOException in case of I/O error * @throws NoSuchFileException if the file does not exist in the cache or remote segment store */ @Override @@ -386,6 +394,7 @@ public long fileLength(String name) throws IOException { /** * Creates and returns a new instance of {@link RemoteIndexOutput} which will be used to copy files to the remote * segment store. + * * @param name the name of the file to create. * @throws IOException in case of I/O error */ @@ -396,8 +405,9 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti /** * Opens a stream for reading an existing file and returns {@link RemoteIndexInput} enclosing the stream. + * * @param name the name of an existing file. - * @throws IOException in case of I/O error + * @throws IOException in case of I/O error * @throws NoSuchFileException if the file does not exist either in cache or remote segment store */ @Override @@ -416,10 +426,10 @@ public IndexInput openInput(String name, IOContext context) throws IOException { * will be used, else, the legacy {@link RemoteSegmentStoreDirectory#copyFrom(Directory, String, String, IOContext)} * will be called. * - * @param from The directory for the file to be uploaded - * @param src File to be uploaded - * @param context IOContext to be used to open IndexInput of file during remote upload - * @param listener Listener to handle upload callback events + * @param from The directory for the file to be uploaded + * @param src File to be uploaded + * @param context IOContext to be used to open IndexInput of file during remote upload + * @param listener Listener to handle upload callback events */ public void copyFrom(Directory from, String src, IOContext context, ActionListener listener) { if (remoteDataDirectory.getBlobContainer() instanceof VerifyingMultiStreamBlobContainer) { @@ -495,10 +505,11 @@ private void uploadBlob(Directory from, String src, String remoteFileName, IOCon /** * This acquires a lock on a given commit by creating a lock file in lock directory using {@code FileLockInfo} + * * @param primaryTerm Primary Term of index at the time of commit. - * @param generation Commit Generation - * @param acquirerId Lock Acquirer ID which wants to acquire lock on the commit. - * @throws IOException will be thrown in case i) listing file failed or ii) Writing the lock file failed. + * @param generation Commit Generation + * @param acquirerId Lock Acquirer ID which wants to acquire lock on the commit. + * @throws IOException will be thrown in case i) listing file failed or ii) Writing the lock file failed. * @throws NoSuchFileException when metadata file is not present for given commit point. */ @Override @@ -509,10 +520,11 @@ public void acquireLock(long primaryTerm, long generation, String acquirerId) th /** * Releases a lock which was acquired on given segment commit. + * * @param primaryTerm Primary Term of index at the time of commit. - * @param generation Commit Generation - * @param acquirerId Acquirer ID for which lock needs to be released. - * @throws IOException will be thrown in case i) listing lock files failed or ii) deleting the lock file failed. + * @param generation Commit Generation + * @param acquirerId Acquirer ID for which lock needs to be released. + * @throws IOException will be thrown in case i) listing lock files failed or ii) deleting the lock file failed. * @throws NoSuchFileException when metadata file is not present for given commit point. */ @Override @@ -523,10 +535,11 @@ public void releaseLock(long primaryTerm, long generation, String acquirerId) th /** * Checks if a specific commit have any corresponding lock file. + * * @param primaryTerm Primary Term of index at the time of commit. - * @param generation Commit Generation + * @param generation Commit Generation * @return True if there is at least one lock for given primary term and generation. - * @throws IOException will be thrown in case listing lock files failed. + * @throws IOException will be thrown in case listing lock files failed. * @throws NoSuchFileException when metadata file is not present for given commit point. */ @Override @@ -590,8 +603,9 @@ public void copyFrom(Directory from, String src, String dest, IOContext context) * Checks if the file exists in the uploadedSegments cache and the checksum matches. * It is important to match the checksum as the same segment filename can be used for different * segments due to a concurrency issue. + * * @param localFilename filename of segment stored in local filesystem - * @param checksum checksum of the segment file + * @param checksum checksum of the segment file * @return true if file exists in cache and checksum matches. */ public boolean containsFile(String localFilename, String checksum) { @@ -601,7 +615,8 @@ public boolean containsFile(String localFilename, String checksum) { /** * Upload metadata file - * @param segmentFiles segment files that are part of the shard at the time of the latest refresh + * + * @param segmentFiles segment files that are part of the shard at the time of the latest refresh * @param segmentInfosSnapshot SegmentInfos bytes to store as part of metadata file * @param storeDirectory instance of local directory to temporarily create metadata file before upload * @param translogGeneration translog generation @@ -663,7 +678,8 @@ public void uploadMetadata( /** * Parses the provided SegmentInfos to retrieve a mapping of the provided segment files to * the respective Lucene major version that wrote the segments - * @param segmentFiles List of segment files for which the Lucene major version is needed + * + * @param segmentFiles List of segment files for which the Lucene major version is needed * @param segmentInfosSnapshot SegmentInfos instance to parse * @return Map of the segment file to its Lucene major version */ @@ -694,6 +710,7 @@ private Map getSegmentToLuceneVersion(Collection segmen /** * Try to delete file from local store. Fails silently on failures + * * @param filename: name of the file to be deleted */ private void tryAndDeleteLocalFile(String filename, Directory directory) { @@ -759,6 +776,7 @@ public Map getSegmentsUploadedToRemoteStore() { * Delete stale segment and metadata files * One metadata file is kept per commit (refresh updates the same file). To read segments uploaded to remote store, * we just need to read the latest metadata file. All the stale metadata files can be safely deleted. + * * @param lastNMetadataFilesToKeep number of metadata files to keep * @throws IOException in case of I/O error while reading from / writing to remote segment store */ @@ -844,6 +862,7 @@ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) { /** * Delete stale segment and metadata files asynchronously. * This method calls {@link RemoteSegmentStoreDirectory#deleteStaleSegments(int)} in an async manner. + * * @param lastNMetadataFilesToKeep number of metadata files to keep */ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListener listener) { diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java index 8d444f5d10f26..0e23ce4d8b9dc 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java @@ -13,11 +13,11 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.IndexShard; -import org.opensearch.core.index.shard.ShardId; -import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.index.store.Store; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -131,13 +131,14 @@ public void testValidateSegmentUploadLag() { avg = (double) sum.get() / 20; Map nameSizeMap = new HashMap<>(); nameSizeMap.put("a", (long) (12 * avg)); - pressureTracker.setLatestLocalFileNameLengthMap(nameSizeMap); + pressureTracker.updateLatestLocalFileNameLengthMap(nameSizeMap.keySet(), nameSizeMap::get); e = assertThrows(OpenSearchRejectedExecutionException.class, () -> pressureService.validateSegmentsUploadLag(shardId)); assertTrue(e.getMessage().contains("due to remote segments lagging behind local segments")); assertTrue(e.getMessage().contains("bytes_lag:114 dynamic_bytes_lag_threshold:95.0")); - nameSizeMap.put("a", (long) (2 * avg)); - pressureTracker.setLatestLocalFileNameLengthMap(nameSizeMap); + nameSizeMap.clear(); + nameSizeMap.put("b", (long) (2 * avg)); + pressureTracker.updateLatestLocalFileNameLengthMap(nameSizeMap.keySet(), nameSizeMap::get); pressureService.validateSegmentsUploadLag(shardId); // 3. Consecutive failures more than the limit diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java index 208cea111f2e1..c259129338702 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java @@ -10,9 +10,9 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.io.stream.BytesStreamOutput; -import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.store.DirectoryFileTransferTracker; import org.opensearch.test.OpenSearchTestCase; @@ -389,14 +389,14 @@ public void testComputeBytesLag() { Map fileSizeMap = new HashMap<>(); fileSizeMap.put("a", 100L); fileSizeMap.put("b", 105L); - pressureTracker.setLatestLocalFileNameLengthMap(fileSizeMap); + pressureTracker.updateLatestLocalFileNameLengthMap(fileSizeMap.keySet(), fileSizeMap::get); assertEquals(205L, pressureTracker.getBytesLag()); pressureTracker.addToLatestUploadedFiles("a"); assertEquals(105L, pressureTracker.getBytesLag()); fileSizeMap.put("c", 115L); - pressureTracker.setLatestLocalFileNameLengthMap(fileSizeMap); + pressureTracker.updateLatestLocalFileNameLengthMap(fileSizeMap.keySet(), fileSizeMap::get); assertEquals(220L, pressureTracker.getBytesLag()); pressureTracker.addToLatestUploadedFiles("b"); @@ -573,7 +573,7 @@ public void testStatsObjectCreation() { /** * Tests whether RemoteSegmentTransferTracker.Stats object serialize and deserialize is working fine. * This comes into play during internode data transfer. - * */ + */ public void testStatsObjectCreationViaStream() throws IOException { pressureTracker = constructTracker(); RemoteSegmentTransferTracker.Stats pressureTrackerStats = pressureTracker.stats(); diff --git a/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java index b9df9ed5a13d8..aa53e6d4688a5 100644 --- a/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java @@ -19,6 +19,12 @@ import java.io.IOException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class CloseableRetryableRefreshListenerTests extends OpenSearchTestCase { @@ -37,9 +43,9 @@ public void init() { public void testPerformAfterRefresh() throws IOException { CountDownLatch countDownLatch = new CountDownLatch(2); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(null) { + CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) { @Override - protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); return false; } @@ -69,9 +75,9 @@ protected Logger getLogger() { public void testCloseAfterRefresh() throws IOException { final int initialCount = randomIntBetween(10, 100); final CountDownLatch countDownLatch = new CountDownLatch(initialCount); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(null) { + CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) { @Override - protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); return false; } @@ -106,9 +112,9 @@ protected Logger getLogger() { public void testNoRetry() throws IOException { int initialCount = randomIntBetween(10, 100); final CountDownLatch countDownLatch = new CountDownLatch(initialCount); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(null) { + CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) { @Override - protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); return countDownLatch.getCount() == 0; } @@ -127,7 +133,7 @@ protected Logger getLogger() { testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { @Override - protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); return countDownLatch.getCount() == 0; } @@ -146,7 +152,7 @@ protected Logger getLogger() { testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { @Override - protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); return countDownLatch.getCount() == 0; } @@ -170,7 +176,7 @@ protected Logger getLogger() { testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { @Override - protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); return countDownLatch.getCount() == 0; } @@ -201,7 +207,7 @@ public void testRetry() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(initialCount); CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { @Override - protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); return countDownLatch.getCount() == 0; } @@ -223,6 +229,11 @@ protected TimeValue getNextRetryInterval() { protected Logger getLogger() { return logger; } + + @Override + protected boolean isRetryEnabled() { + return true; + } }; testRefreshListener.afterRefresh(true); assertBusy(() -> assertEquals(0, countDownLatch.getCount())); @@ -237,7 +248,7 @@ public void testCloseWithRetryPending() throws IOException { final CountDownLatch countDownLatch = new CountDownLatch(initialCount); CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { @Override - protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); return countDownLatch.getCount() == 0; } @@ -269,7 +280,7 @@ public void testCloseWaitsForAcquiringAllPermits() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(1); CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { @Override - protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { try { Thread.sleep(5000); } catch (InterruptedException e) { @@ -299,6 +310,149 @@ protected Logger getLogger() { testRefreshListener.close(); } + public void testScheduleRetryAfterClose() throws Exception { + // This tests that once the listener has been closed, even the retries would not be scheduled. + final AtomicLong runCount = new AtomicLong(); + CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + @Override + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + runCount.incrementAndGet(); + return false; + } + + @Override + public void beforeRefresh() {} + + @Override + protected Logger getLogger() { + return logger; + } + + @Override + protected String getRetryThreadPoolName() { + return ThreadPool.Names.REMOTE_REFRESH_RETRY; + } + + @Override + protected TimeValue getNextRetryInterval() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + return TimeValue.timeValueMillis(100); + } + }; + Thread thread1 = new Thread(() -> { + try { + testRefreshListener.afterRefresh(true); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + Thread thread2 = new Thread(() -> { + try { + Thread.sleep(500); + testRefreshListener.close(); + } catch (IOException | InterruptedException e) { + throw new AssertionError(e); + } + }); + thread1.start(); + thread2.start(); + thread1.join(); + thread2.join(); + assertBusy(() -> assertEquals(1, runCount.get())); + } + + public void testConcurrentScheduleRetry() throws Exception { + // This tests that there can be only 1 retry that can be scheduled at a time. + final AtomicLong runCount = new AtomicLong(); + final AtomicInteger retryCount = new AtomicInteger(0); + CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + @Override + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { + retryCount.incrementAndGet(); + runCount.incrementAndGet(); + return retryCount.get() >= 2; + } + + @Override + public void beforeRefresh() {} + + @Override + protected Logger getLogger() { + return logger; + } + + @Override + protected String getRetryThreadPoolName() { + return ThreadPool.Names.REMOTE_REFRESH_RETRY; + } + + @Override + protected TimeValue getNextRetryInterval() { + return TimeValue.timeValueMillis(5000); + } + + @Override + protected boolean isRetryEnabled() { + return true; + } + }; + testRefreshListener.afterRefresh(true); + testRefreshListener.afterRefresh(true); + assertBusy(() -> assertEquals(3, runCount.get())); + testRefreshListener.close(); + } + + public void testExceptionDuringThreadPoolSchedule() throws Exception { + // This tests that if there are exceptions while scheduling the task in the threadpool, the retrySchedule boolean + // is reset properly to allow future scheduling to happen. + AtomicInteger runCount = new AtomicInteger(); + ThreadPool mockThreadPool = mock(ThreadPool.class); + when(mockThreadPool.schedule(any(), any(), any())).thenThrow(new RuntimeException()); + CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mockThreadPool) { + @Override + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { + runCount.incrementAndGet(); + return false; + } + + @Override + public void beforeRefresh() {} + + @Override + protected Logger getLogger() { + return logger; + } + + @Override + protected String getRetryThreadPoolName() { + return ThreadPool.Names.REMOTE_REFRESH_RETRY; + } + + @Override + protected TimeValue getNextRetryInterval() { + return TimeValue.timeValueMillis(100); + } + + @Override + protected boolean isRetryEnabled() { + return true; + } + }; + assertThrows(RuntimeException.class, () -> testRefreshListener.afterRefresh(true)); + assertBusy(() -> assertFalse(testRefreshListener.getRetryScheduledStatus())); + assertEquals(1, runCount.get()); + testRefreshListener.close(); + } + @After public void tearDown() throws Exception { super.tearDown(); diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 66938eec10513..889c6b0121450 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -24,11 +24,16 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.engine.InternalEngineFactory; +import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; import org.opensearch.index.remote.RemoteSegmentTransferTracker; +import org.opensearch.index.store.RemoteDirectory; import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils; import org.opensearch.index.store.Store; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; @@ -44,8 +49,12 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; +import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes; +import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata; public class RemoteStoreRefreshListenerTests extends IndexShardTestCase { private IndexShard indexShard; @@ -77,11 +86,8 @@ public void setup(boolean primary, int numberOfDocs) throws IOException { ); remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); remoteRefreshSegmentPressureService.afterIndexShardCreated(indexShard); - remoteStoreRefreshListener = new RemoteStoreRefreshListener( - indexShard, - SegmentReplicationCheckpointPublisher.EMPTY, - remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()) - ); + RemoteSegmentTransferTracker tracker = remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, SegmentReplicationCheckpointPublisher.EMPTY, tracker); } private void indexDocs(int startDocId, int numberOfDocs) throws IOException { @@ -98,6 +104,70 @@ public void tearDown() throws Exception { super.tearDown(); } + public void testRemoteDirectoryInitThrowsException() throws IOException { + // Methods used in the constructor of RemoteSegmentTrackerListener have been mocked to reproduce specific exceptions + // to test the failure modes possible during construction of RemoteSegmentTrackerListener object. + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory()); + + // Mocking the IndexShard methods and dependent classes. + ShardId shardId = new ShardId("index1", "_na_", 1); + IndexShard shard = mock(IndexShard.class); + Store store = mock(Store.class); + Directory directory = mock(Directory.class); + ShardRouting shardRouting = mock(ShardRouting.class); + when(shard.store()).thenReturn(store); + when(store.directory()).thenReturn(directory); + when(shard.shardId()).thenReturn(shardId); + when(shard.routingEntry()).thenReturn(shardRouting); + when(shardRouting.primary()).thenReturn(true); + when(shard.getThreadPool()).thenReturn(mock(ThreadPool.class)); + + // Mock the Store, Directory and RemoteSegmentStoreDirectory classes + Store remoteStore = mock(Store.class); + when(shard.remoteStore()).thenReturn(remoteStore); + RemoteDirectory remoteMetadataDirectory = mock(RemoteDirectory.class); + AtomicLong listFilesCounter = new AtomicLong(); + + // Below we are trying to get the IOException thrown in the constructor of the RemoteSegmentStoreDirectory. + doAnswer(invocation -> { + if (listFilesCounter.incrementAndGet() <= 1) { + return Collections.singletonList("dummy string"); + } + throw new IOException(); + }).when(remoteMetadataDirectory).listFilesByPrefixInLexicographicOrder(MetadataFilenameUtils.METADATA_PREFIX, 1); + + SegmentInfos segmentInfos; + try (Store indexShardStore = indexShard.store()) { + segmentInfos = indexShardStore.readLastCommittedSegmentsInfo(); + } + + when(remoteMetadataDirectory.openInput(any(), any())).thenAnswer( + I -> createMetadataFileBytes(getDummyMetadata("_0", 1), indexShard.getLatestReplicationCheckpoint(), segmentInfos) + ); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( + mock(RemoteDirectory.class), + remoteMetadataDirectory, + mock(RemoteStoreLockManager.class), + mock(ThreadPool.class) + ); + FilterDirectory remoteStoreFilterDirectory = new RemoteStoreRefreshListenerTests.TestFilterDirectory( + new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory) + ); + when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); + + // Since the thrown IOException is caught in the constructor, ctor should be invoked successfully. + new RemoteStoreRefreshListener(shard, SegmentReplicationCheckpointPublisher.EMPTY, mock(RemoteSegmentTransferTracker.class)); + + // Validate that the openInput method of remoteMetadataDirectory has been opened only once and the + // listFilesByPrefixInLexicographicOrder has been called twice. + verify(remoteMetadataDirectory, times(1)).openInput(any(), any()); + verify(remoteMetadataDirectory, times(2)).listFilesByPrefixInLexicographicOrder(MetadataFilenameUtils.METADATA_PREFIX, 1); + } + public void testAfterRefresh() throws IOException { setup(true, 3); assertDocs(indexShard, "1", "2", "3"); @@ -433,7 +503,7 @@ private Tuple m AtomicLong counter = new AtomicLong(); // Mock indexShard.getSegmentInfosSnapshot() doAnswer(invocation -> { - if (counter.incrementAndGet() <= succeedOnAttempt - 1) { + if (counter.incrementAndGet() <= succeedOnAttempt) { throw new RuntimeException("Inducing failure in upload"); } return indexShard.getSegmentInfosSnapshot(); @@ -470,11 +540,8 @@ private Tuple m when(shard.indexSettings()).thenReturn(indexShard.indexSettings()); when(shard.shardId()).thenReturn(indexShard.shardId()); remoteRefreshSegmentPressureService.afterIndexShardCreated(shard); - RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener( - shard, - emptyCheckpointPublisher, - remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()) - ); + RemoteSegmentTransferTracker tracker = remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, emptyCheckpointPublisher, tracker); refreshListener.afterRefresh(true); return Tuple.tuple(refreshListener, remoteRefreshSegmentPressureService); } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index f2d39b2ac7bee..b2d55171e680d 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -13,8 +13,6 @@ import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.store.ByteBuffersDataOutput; -import org.apache.lucene.store.ByteBuffersIndexOutput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; @@ -43,7 +41,6 @@ import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; @@ -56,20 +53,22 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.startsWith; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.times; +import static org.hamcrest.CoreMatchers.is; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.doReturn; -import static org.hamcrest.CoreMatchers.is; +import static org.mockito.Mockito.startsWith; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.opensearch.test.RemoteStoreTestUtils.createMetadataFileBytes; +import static org.opensearch.test.RemoteStoreTestUtils.getDummyMetadata; public class RemoteSegmentStoreDirectoryTests extends IndexShardTestCase { private RemoteDirectory remoteDataDirectory; @@ -194,93 +193,6 @@ public void testInitNoMetadataFile() throws IOException { assertEquals(Set.of(), actualCache.keySet()); } - private Map getDummyMetadata(String prefix, int commitGeneration) { - Map metadata = new HashMap<>(); - - metadata.put( - prefix + ".cfe", - prefix - + ".cfe::" - + prefix - + ".cfe__" - + UUIDs.base64UUID() - + "::" - + randomIntBetween(1000, 5000) - + "::" - + randomIntBetween(512000, 1024000) - + "::" - + Version.MIN_SUPPORTED_MAJOR - ); - metadata.put( - prefix + ".cfs", - prefix - + ".cfs::" - + prefix - + ".cfs__" - + UUIDs.base64UUID() - + "::" - + randomIntBetween(1000, 5000) - + "::" - + randomIntBetween(512000, 1024000) - + "::" - + Version.MIN_SUPPORTED_MAJOR - ); - metadata.put( - prefix + ".si", - prefix - + ".si::" - + prefix - + ".si__" - + UUIDs.base64UUID() - + "::" - + randomIntBetween(1000, 5000) - + "::" - + randomIntBetween(512000, 1024000) - + "::" - + Version.LATEST.major - ); - metadata.put( - "segments_" + commitGeneration, - "segments_" - + commitGeneration - + "::segments_" - + commitGeneration - + "__" - + UUIDs.base64UUID() - + "::" - + randomIntBetween(1000, 5000) - + "::" - + randomIntBetween(1024, 5120) - + "::" - + Version.LATEST.major - ); - return metadata; - } - - /** - * Prepares metadata file bytes with header and footer - * @param segmentFilesMap: actual metadata content - * @return ByteArrayIndexInput: metadata file bytes with header and footer - * @throws IOException IOException - */ - private ByteArrayIndexInput createMetadataFileBytes(Map segmentFilesMap, ReplicationCheckpoint replicationCheckpoint) - throws IOException { - ByteBuffersDataOutput byteBuffersIndexOutput = new ByteBuffersDataOutput(); - segmentInfos.write(new ByteBuffersIndexOutput(byteBuffersIndexOutput, "", "")); - byte[] byteArray = byteBuffersIndexOutput.toArrayCopy(); - - BytesStreamOutput output = new BytesStreamOutput(); - OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); - CodecUtil.writeHeader(indexOutput, RemoteSegmentMetadata.METADATA_CODEC, RemoteSegmentMetadata.CURRENT_VERSION); - indexOutput.writeMapOfStrings(segmentFilesMap); - RemoteSegmentMetadata.writeCheckpointToIndexOutput(replicationCheckpoint, indexOutput); - indexOutput.writeLong(byteArray.length); - indexOutput.writeBytes(byteArray, byteArray.length); - CodecUtil.writeFooter(indexOutput); - indexOutput.close(); - return new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes())); - } - private Map> populateMetadata() throws IOException { List metadataFiles = new ArrayList<>(); @@ -311,13 +223,25 @@ private Map> populateMetadata() throws IOException { ); when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenAnswer( - I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename), indexShard.getLatestReplicationCheckpoint()) + I -> createMetadataFileBytes( + metadataFilenameContentMapping.get(metadataFilename), + indexShard.getLatestReplicationCheckpoint(), + segmentInfos + ) ); when(remoteMetadataDirectory.openInput(metadataFilename2, IOContext.DEFAULT)).thenAnswer( - I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename2), indexShard.getLatestReplicationCheckpoint()) + I -> createMetadataFileBytes( + metadataFilenameContentMapping.get(metadataFilename2), + indexShard.getLatestReplicationCheckpoint(), + segmentInfos + ) ); when(remoteMetadataDirectory.openInput(metadataFilename3, IOContext.DEFAULT)).thenAnswer( - I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename3), indexShard.getLatestReplicationCheckpoint()) + I -> createMetadataFileBytes( + metadataFilenameContentMapping.get(metadataFilename3), + indexShard.getLatestReplicationCheckpoint(), + segmentInfos + ) ); return metadataFilenameContentMapping; @@ -654,7 +578,7 @@ public void testContainsFile() throws IOException { metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024::" + Version.LATEST.major); when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn( - createMetadataFileBytes(metadata, indexShard.getLatestReplicationCheckpoint()) + createMetadataFileBytes(metadata, indexShard.getLatestReplicationCheckpoint(), segmentInfos) ); remoteSegmentStoreDirectory.init(); @@ -717,7 +641,11 @@ public void testUploadMetadataNonEmpty() throws IOException { getDummyMetadata("_0", (int) generation) ); when(remoteMetadataDirectory.openInput(latestMetadataFileName, IOContext.DEFAULT)).thenReturn( - createMetadataFileBytes(metadataFilenameContentMapping.get(latestMetadataFileName), indexShard.getLatestReplicationCheckpoint()) + createMetadataFileBytes( + metadataFilenameContentMapping.get(latestMetadataFileName), + indexShard.getLatestReplicationCheckpoint(), + segmentInfos + ) ); remoteSegmentStoreDirectory.init(); diff --git a/test/framework/src/main/java/org/opensearch/test/RemoteStoreTestUtils.java b/test/framework/src/main/java/org/opensearch/test/RemoteStoreTestUtils.java new file mode 100644 index 0000000000000..0744d5fca853b --- /dev/null +++ b/test/framework/src/main/java/org/opensearch/test/RemoteStoreTestUtils.java @@ -0,0 +1,129 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.test; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.ByteBuffersDataOutput; +import org.apache.lucene.store.ByteBuffersIndexOutput; +import org.apache.lucene.store.OutputStreamIndexOutput; +import org.apache.lucene.util.Version; +import org.opensearch.common.UUIDs; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.opensearch.test.OpenSearchTestCase.randomIntBetween; + +/** + * Utilities for remote store related operations used across one or more tests. + */ +public final class RemoteStoreTestUtils { + + private RemoteStoreTestUtils() { + + } + + /** + * Prepares metadata file bytes with header and footer + * + * @param segmentFilesMap: actual metadata content + * @return ByteArrayIndexInput: metadata file bytes with header and footer + * @throws IOException IOException + */ + public static ByteArrayIndexInput createMetadataFileBytes( + Map segmentFilesMap, + ReplicationCheckpoint replicationCheckpoint, + SegmentInfos segmentInfos + ) throws IOException { + ByteBuffersDataOutput byteBuffersIndexOutput = new ByteBuffersDataOutput(); + segmentInfos.write(new ByteBuffersIndexOutput(byteBuffersIndexOutput, "", "")); + byte[] byteArray = byteBuffersIndexOutput.toArrayCopy(); + + BytesStreamOutput output = new BytesStreamOutput(); + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); + CodecUtil.writeHeader(indexOutput, RemoteSegmentMetadata.METADATA_CODEC, RemoteSegmentMetadata.CURRENT_VERSION); + indexOutput.writeMapOfStrings(segmentFilesMap); + RemoteSegmentMetadata.writeCheckpointToIndexOutput(replicationCheckpoint, indexOutput); + indexOutput.writeLong(byteArray.length); + indexOutput.writeBytes(byteArray, byteArray.length); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + return new ByteArrayIndexInput("segment metadata", BytesReference.toBytes(output.bytes())); + } + + public static Map getDummyMetadata(String prefix, int commitGeneration) { + Map metadata = new HashMap<>(); + + metadata.put( + prefix + ".cfe", + prefix + + ".cfe::" + + prefix + + ".cfe__" + + UUIDs.base64UUID() + + "::" + + randomIntBetween(1000, 5000) + + "::" + + randomIntBetween(512000, 1024000) + + "::" + + Version.MIN_SUPPORTED_MAJOR + ); + metadata.put( + prefix + ".cfs", + prefix + + ".cfs::" + + prefix + + ".cfs__" + + UUIDs.base64UUID() + + "::" + + randomIntBetween(1000, 5000) + + "::" + + randomIntBetween(512000, 1024000) + + "::" + + Version.MIN_SUPPORTED_MAJOR + ); + metadata.put( + prefix + ".si", + prefix + + ".si::" + + prefix + + ".si__" + + UUIDs.base64UUID() + + "::" + + randomIntBetween(1000, 5000) + + "::" + + randomIntBetween(512000, 1024000) + + "::" + + Version.LATEST.major + ); + metadata.put( + "segments_" + commitGeneration, + "segments_" + + commitGeneration + + "::segments_" + + commitGeneration + + "__" + + UUIDs.base64UUID() + + "::" + + randomIntBetween(1000, 5000) + + "::" + + randomIntBetween(1024, 5120) + + "::" + + Version.LATEST.major + ); + return metadata; + } +}