diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index 3e82295ecdaa8..50387a3bbb18d 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -17,7 +17,6 @@ import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.common.CheckedRunnable; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; @@ -150,14 +149,14 @@ public DiffableUtils.MapDiff getIndexRoutingAsyncAction( + @Override + public void getIndexRoutingAsyncAction( ClusterState clusterState, IndexRoutingTable indexRouting, LatchedActionListener latchedActionListener, @@ -187,7 +186,7 @@ public CheckedRunnable getIndexRoutingAsyncAction( ) ); - return () -> uploadIndex(indexRouting, fileName, blobContainer, completionListener); + uploadIndex(indexRouting, fileName, blobContainer, completionListener); } /** @@ -274,7 +273,7 @@ private void uploadIndex( } @Override - public CheckedRunnable getAsyncIndexRoutingReadAction( + public void getAsyncIndexRoutingReadAction( String uploadedFilename, Index index, LatchedActionListener latchedActionListener @@ -284,7 +283,7 @@ public CheckedRunnable getAsyncIndexRoutingReadAction( BlobContainer blobContainer = blobStoreRepository.blobStore() .blobContainer(BlobPath.cleanPath().add(uploadedFilename.substring(0, idx))); - return () -> readAsync( + readAsync( blobContainer, blobFileName, index, diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java index 6236d107d0220..f1cfcb3c66eb0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java @@ -13,7 +13,6 @@ import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.common.CheckedRunnable; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.core.index.Index; @@ -42,14 +41,13 @@ public DiffableUtils.MapDiff getIndexRoutingAsyncAction( + public void getIndexRoutingAsyncAction( ClusterState clusterState, IndexRoutingTable indexRouting, LatchedActionListener latchedActionListener, BlobPath clusterBasePath ) { // noop - return () -> {}; } @Override @@ -63,13 +61,12 @@ public List getAllUploadedIndices } @Override - public CheckedRunnable getAsyncIndexRoutingReadAction( + public void getAsyncIndexRoutingReadAction( String uploadedFilename, Index index, LatchedActionListener latchedActionListener ) { // noop - return () -> {}; } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index d455dfb58eabc..fb09dc9449994 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -13,7 +13,6 @@ import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.common.CheckedRunnable; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.lifecycle.LifecycleComponent; import org.opensearch.core.common.io.stream.StreamInput; @@ -46,7 +45,7 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException { List getIndicesRouting(RoutingTable routingTable); - CheckedRunnable getAsyncIndexRoutingReadAction( + void getAsyncIndexRoutingReadAction( String uploadedFilename, Index index, LatchedActionListener latchedActionListener @@ -62,7 +61,7 @@ DiffableUtils.MapDiff> RoutingTable after ); - CheckedRunnable getIndexRoutingAsyncAction( + void getIndexRoutingAsyncAction( ClusterState clusterState, IndexRoutingTable indexRouting, LatchedActionListener latchedActionListener, diff --git a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java new file mode 100644 index 0000000000000..dc301635c4a80 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.remote; + +import org.opensearch.core.action.ActionListener; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.model.RemoteReadResult; + +import java.util.HashMap; +import java.util.Map; + +/** + * An abstract class that provides a base implementation for managing remote entities in the remote store. + */ +public abstract class AbstractRemoteWritableEntityManager implements RemoteWritableEntityManager { + /** + * A map that stores the remote writable entity stores, keyed by the entity type. + */ + protected final Map remoteWritableEntityStores = new HashMap<>(); + + /** + * Retrieves the remote writable entity store for the given entity. + * + * @param entity the entity for which the store is requested + * @return the remote writable entity store for the given entity + * @throws IllegalArgumentException if the entity type is unknown + */ + protected RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) { + RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType()); + if (remoteStore == null) { + throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]"); + } + return remoteStore; + } + + /** + * Returns an ActionListener for handling the write operation for the specified component, remote object, and latched action listener. + * + * @param component the component for which the write operation is performed + * @param remoteEntity the remote object to be written + * @param listener the listener to be notified when the write operation completes + * @return an ActionListener for handling the write operation + */ + protected abstract ActionListener getWrappedWriteListener( + String component, + AbstractRemoteWritableBlobEntity remoteEntity, + ActionListener listener + ); + + /** + * Returns an ActionListener for handling the read operation for the specified component, + * remote object, and latched action listener. + * + * @param component the component for which the read operation is performed + * @param remoteEntity the remote object to be read + * @param listener the listener to be notified when the read operation completes + * @return an ActionListener for handling the read operation + */ + protected abstract ActionListener getWrappedReadListener( + String component, + AbstractRemoteWritableBlobEntity remoteEntity, + ActionListener listener + ); + + @Override + public void writeAsync( + String component, + AbstractRemoteWritableBlobEntity entity, + ActionListener listener + ) { + getStore(entity).writeAsync(entity, getWrappedWriteListener(component, entity, listener)); + } + + @Override + public void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener listener) { + getStore(entity).readAsync(entity, getWrappedReadListener(component, entity, listener)); + } +} diff --git a/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityManager.java b/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityManager.java new file mode 100644 index 0000000000000..7693d1b5284bd --- /dev/null +++ b/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityManager.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.remote; + +import org.opensearch.core.action.ActionListener; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.model.RemoteReadResult; + +/** + * The RemoteWritableEntityManager interface provides async read and write methods for managing remote entities in the remote store + */ +public interface RemoteWritableEntityManager { + + /** + * Performs an asynchronous read operation for the specified component and entity. + * + * @param component the component for which the read operation is performed + * @param entity the entity to be read + * @param listener the listener to be notified when the read operation completes. + * The listener's {@link ActionListener#onResponse(Object)} method + * is called with a {@link RemoteReadResult} object containing the + * read data on successful read. The + * {@link ActionListener#onFailure(Exception)} method is called with + * an exception if the read operation fails. + */ + void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener listener); + + /** + * Performs an asynchronous write operation for the specified component and entity. + * + * @param component the component for which the write operation is performed + * @param entity the entity to be written + * @param listener the listener to be notified when the write operation completes. + * The listener's {@link ActionListener#onResponse(Object)} method + * is called with a {@link UploadedMetadata} object containing the + * uploaded metadata on successful write. The + * {@link ActionListener#onFailure(Exception)} method is called with + * an exception if the write operation fails. + */ + void writeAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener listener); +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java index 8f986423587d7..67ac8d2b9a810 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java @@ -8,13 +8,11 @@ package org.opensearch.gateway.remote; -import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer; -import org.opensearch.common.CheckedRunnable; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; -import org.opensearch.common.remote.RemoteWritableEntityStore; +import org.opensearch.common.remote.AbstractRemoteWritableEntityManager; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.gateway.remote.model.RemoteClusterBlocks; @@ -26,9 +24,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; -import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.Map; /** @@ -36,13 +32,11 @@ * * @opensearch.internal */ -public class RemoteClusterStateAttributesManager { +public class RemoteClusterStateAttributesManager extends AbstractRemoteWritableEntityManager { public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute"; public static final String DISCOVERY_NODES = "nodes"; public static final String CLUSTER_BLOCKS = "blocks"; public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1; - private final Map remoteWritableEntityStores; - private final NamedWriteableRegistry namedWriteableRegistry; RemoteClusterStateAttributesManager( String clusterName, @@ -51,8 +45,6 @@ public class RemoteClusterStateAttributesManager { NamedWriteableRegistry namedWriteableRegistry, ThreadPool threadpool ) { - this.namedWriteableRegistry = namedWriteableRegistry; - this.remoteWritableEntityStores = new HashMap<>(); this.remoteWritableEntityStores.put( RemoteDiscoveryNodes.DISCOVERY_NODES, new RemoteClusterStateBlobStore<>( @@ -85,46 +77,28 @@ public class RemoteClusterStateAttributesManager { ); } - /** - * Allows async upload of Cluster State Attribute components to remote - */ - CheckedRunnable getAsyncMetadataWriteAction( + @Override + protected ActionListener getWrappedWriteListener( String component, - AbstractRemoteWritableBlobEntity blobEntity, - LatchedActionListener latchedActionListener - ) { - return () -> getStore(blobEntity).writeAsync(blobEntity, getActionListener(component, blobEntity, latchedActionListener)); - } - - private ActionListener getActionListener( - String component, - AbstractRemoteWritableBlobEntity remoteObject, - LatchedActionListener latchedActionListener + AbstractRemoteWritableBlobEntity remoteEntity, + ActionListener listener ) { return ActionListener.wrap( - resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()), - ex -> latchedActionListener.onFailure(new RemoteStateTransferException(component, remoteObject, ex)) + resp -> listener.onResponse(remoteEntity.getUploadedMetadata()), + ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteEntity, ex)) ); } - private RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) { - RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType()); - if (remoteStore == null) { - throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]"); - } - return remoteStore; - } - - public CheckedRunnable getAsyncMetadataReadAction( + @Override + protected ActionListener getWrappedReadListener( String component, - AbstractRemoteWritableBlobEntity blobEntity, - LatchedActionListener listener + AbstractRemoteWritableBlobEntity remoteEntity, + ActionListener listener ) { - final ActionListener actionListener = ActionListener.wrap( + return ActionListener.wrap( response -> listener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)), - listener::onFailure + ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteEntity, ex)) ); - return () -> getStore(blobEntity).readAsync(blobEntity, actionListener); } public DiffableUtils.MapDiff> getUpdatedCustoms( @@ -158,4 +132,5 @@ public DiffableUtils.MapDiff> uploadTasks = new ConcurrentHashMap<>(totalUploadTasks); + List uploadTasks = Collections.synchronizedList(new ArrayList<>(totalUploadTasks)); Map results = new ConcurrentHashMap<>(totalUploadTasks); List exceptionList = Collections.synchronizedList(new ArrayList<>(totalUploadTasks)); @@ -517,170 +516,158 @@ UploadedMetadataResults writeMetadataInParallel( ); if (uploadSettingsMetadata) { - uploadTasks.put( + uploadTasks.add(SETTING_METADATA); + remoteGlobalMetadataManager.writeAsync( SETTING_METADATA, - remoteGlobalMetadataManager.getAsyncMetadataWriteAction( - new RemotePersistentSettingsMetadata( - clusterState.metadata().persistentSettings(), - clusterState.metadata().version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + new RemotePersistentSettingsMetadata( + clusterState.metadata().persistentSettings(), + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (uploadTransientSettingMetadata) { - uploadTasks.put( + uploadTasks.add(TRANSIENT_SETTING_METADATA); + remoteGlobalMetadataManager.writeAsync( TRANSIENT_SETTING_METADATA, - remoteGlobalMetadataManager.getAsyncMetadataWriteAction( - new RemoteTransientSettingsMetadata( - clusterState.metadata().transientSettings(), - clusterState.metadata().version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + new RemoteTransientSettingsMetadata( + clusterState.metadata().transientSettings(), + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (uploadCoordinationMetadata) { - uploadTasks.put( + uploadTasks.add(COORDINATION_METADATA); + remoteGlobalMetadataManager.writeAsync( COORDINATION_METADATA, - remoteGlobalMetadataManager.getAsyncMetadataWriteAction( - new RemoteCoordinationMetadata( - clusterState.metadata().coordinationMetadata(), - clusterState.metadata().version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + new RemoteCoordinationMetadata( + clusterState.metadata().coordinationMetadata(), + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (uploadTemplateMetadata) { - uploadTasks.put( + uploadTasks.add(TEMPLATES_METADATA); + remoteGlobalMetadataManager.writeAsync( TEMPLATES_METADATA, - remoteGlobalMetadataManager.getAsyncMetadataWriteAction( - new RemoteTemplatesMetadata( - clusterState.metadata().templatesMetadata(), - clusterState.metadata().version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + new RemoteTemplatesMetadata( + clusterState.metadata().templatesMetadata(), + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (uploadDiscoveryNodes) { - uploadTasks.put( - DISCOVERY_NODES, - remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( - RemoteDiscoveryNodes.DISCOVERY_NODES, - new RemoteDiscoveryNodes( - clusterState.nodes(), - clusterState.version(), - clusterState.stateUUID(), - blobStoreRepository.getCompressor() - ), - listener - ) + uploadTasks.add(DISCOVERY_NODES); + remoteClusterStateAttributesManager.writeAsync( + RemoteDiscoveryNodes.DISCOVERY_NODES, + new RemoteDiscoveryNodes( + clusterState.nodes(), + clusterState.version(), + clusterState.stateUUID(), + blobStoreRepository.getCompressor() + ), + listener ); } if (uploadClusterBlock) { - uploadTasks.put( - CLUSTER_BLOCKS, - remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( - RemoteClusterBlocks.CLUSTER_BLOCKS, - new RemoteClusterBlocks( - clusterState.blocks(), - clusterState.version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor() - ), - listener - ) + uploadTasks.add(CLUSTER_BLOCKS); + remoteClusterStateAttributesManager.writeAsync( + RemoteClusterBlocks.CLUSTER_BLOCKS, + new RemoteClusterBlocks( + clusterState.blocks(), + clusterState.version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor() + ), + listener ); } if (uploadHashesOfConsistentSettings) { - uploadTasks.put( + uploadTasks.add(HASHES_OF_CONSISTENT_SETTINGS); + remoteGlobalMetadataManager.writeAsync( HASHES_OF_CONSISTENT_SETTINGS, - remoteGlobalMetadataManager.getAsyncMetadataWriteAction( - new RemoteHashesOfConsistentSettings( - (DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings(), - clusterState.metadata().version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor() - ), - listener - ) + new RemoteHashesOfConsistentSettings( + (DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings(), + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor() + ), + listener ); } customToUpload.forEach((key, value) -> { String customComponent = String.join(CUSTOM_DELIMITER, CUSTOM_METADATA, key); - uploadTasks.put( + uploadTasks.add(customComponent); + remoteGlobalMetadataManager.writeAsync( customComponent, - remoteGlobalMetadataManager.getAsyncMetadataWriteAction( - new RemoteCustomMetadata( - value, - key, - clusterState.metadata().version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - namedWriteableRegistry - ), - listener - ) + new RemoteCustomMetadata( + value, + key, + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + namedWriteableRegistry + ), + listener ); }); indexToUpload.forEach(indexMetadata -> { - uploadTasks.put( + uploadTasks.add(indexMetadata.getIndex().getName()); + remoteIndexMetadataManager.writeAsync( indexMetadata.getIndex().getName(), - remoteIndexMetadataManager.getAsyncIndexMetadataWriteAction(indexMetadata, clusterState.metadata().clusterUUID(), listener) + new RemoteIndexMetadata( + indexMetadata, + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); }); clusterStateCustomToUpload.forEach((key, value) -> { - uploadTasks.put( - key, - remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( - CLUSTER_STATE_CUSTOM, - new RemoteClusterStateCustoms( - value, - key, - clusterState.version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - namedWriteableRegistry - ), - listener - ) + uploadTasks.add(key); + remoteClusterStateAttributesManager.writeAsync( + CLUSTER_STATE_CUSTOM, + new RemoteClusterStateCustoms( + value, + key, + clusterState.version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + namedWriteableRegistry + ), + listener ); }); indicesRoutingToUpload.forEach(indexRoutingTable -> { - uploadTasks.put( - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(), - remoteRoutingTableService.getIndexRoutingAsyncAction( - clusterState, - indexRoutingTable, - listener, - getClusterMetadataBasePath( - blobStoreRepository, - clusterState.getClusterName().value(), - clusterState.metadata().clusterUUID() - ) + uploadTasks.add(InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName()); + remoteRoutingTableService.getIndexRoutingAsyncAction( + clusterState, + indexRoutingTable, + listener, + getClusterMetadataBasePath( + blobStoreRepository, + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID() ) ); }); - - // start async upload of all required metadata files - for (CheckedRunnable uploadTask : uploadTasks.values()) { - uploadTask.run(); - } invokeIndexMetadataUploadListeners(indexToUpload, prevIndexMetadataByName, latch, exceptionList); try { @@ -690,7 +677,7 @@ UploadedMetadataResults writeMetadataInParallel( String.format( Locale.ROOT, "Timed out waiting for transfer of following metadata to complete - %s", - String.join(", ", uploadTasks.keySet()) + String.join(", ", uploadTasks) ) ); exceptionList.forEach(ex::addSuppressed); @@ -699,11 +686,7 @@ UploadedMetadataResults writeMetadataInParallel( } catch (InterruptedException ex) { exceptionList.forEach(ex::addSuppressed); RemoteStateTransferException exception = new RemoteStateTransferException( - String.format( - Locale.ROOT, - "Timed out waiting for transfer of metadata to complete - %s", - String.join(", ", uploadTasks.keySet()) - ), + String.format(Locale.ROOT, "Timed out waiting for transfer of metadata to complete - %s", String.join(", ", uploadTasks)), ex ); Thread.currentThread().interrupt(); @@ -711,14 +694,20 @@ UploadedMetadataResults writeMetadataInParallel( } if (!exceptionList.isEmpty()) { RemoteStateTransferException exception = new RemoteStateTransferException( + String.format(Locale.ROOT, "Exception during transfer of following metadata to Remote - %s", String.join(", ", uploadTasks)) + ); + exceptionList.forEach(exception::addSuppressed); + throw exception; + } + if (results.size() != uploadTasks.size()) { + throw new RemoteStateTransferException( String.format( Locale.ROOT, - "Exception during transfer of following metadata to Remote - %s", - String.join(", ", uploadTasks.keySet()) + "Some metadata components were not uploaded successfully. Objects to be uploaded: %s, uploaded objects: %s", + String.join(", ", uploadTasks), + String.join(", ", results.keySet()) ) ); - exceptionList.forEach(exception::addSuppressed); - throw exception; } UploadedMetadataResults response = new UploadedMetadataResults(); results.forEach((name, uploadedMetadata) -> { @@ -1001,7 +990,6 @@ ClusterState readClusterStateInParallel( + (readTransientSettingsMetadata ? 1 : 0) + (readHashesOfConsistentSettings ? 1 : 0) + clusterStateCustomToRead.size() + indicesRoutingToRead.size(); CountDownLatch latch = new CountDownLatch(totalReadTasks); - List> asyncMetadataReadActions = new ArrayList<>(); List readResults = Collections.synchronizedList(new ArrayList<>()); List readIndexRoutingTableResults = Collections.synchronizedList(new ArrayList<>()); List exceptionList = Collections.synchronizedList(new ArrayList<>(totalReadTasks)); @@ -1015,8 +1003,15 @@ ClusterState readClusterStateInParallel( }), latch); for (UploadedIndexMetadata indexMetadata : indicesToRead) { - asyncMetadataReadActions.add( - remoteIndexMetadataManager.getAsyncIndexMetadataReadAction(clusterUUID, indexMetadata.getUploadedFilename(), listener) + remoteIndexMetadataManager.readAsync( + indexMetadata.getIndexName(), + new RemoteIndexMetadata( + RemoteClusterStateUtils.getFormattedIndexFileName(indexMetadata.getUploadedFilename()), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } @@ -1032,154 +1027,130 @@ ClusterState readClusterStateInParallel( ); for (UploadedIndexMetadata indexRouting : indicesRoutingToRead) { - asyncMetadataReadActions.add( - remoteRoutingTableService.getAsyncIndexRoutingReadAction( - indexRouting.getUploadedFilename(), - new Index(indexRouting.getIndexName(), indexRouting.getIndexUUID()), - routingTableLatchedActionListener - ) + remoteRoutingTableService.getAsyncIndexRoutingReadAction( + indexRouting.getUploadedFilename(), + new Index(indexRouting.getIndexName(), indexRouting.getIndexUUID()), + routingTableLatchedActionListener ); } for (Map.Entry entry : customToRead.entrySet()) { - asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - new RemoteCustomMetadata( - entry.getValue().getUploadedFilename(), - entry.getKey(), - clusterUUID, - blobStoreRepository.getCompressor(), - namedWriteableRegistry - ), - entry.getValue().getAttributeName(), - listener - ) + remoteGlobalMetadataManager.readAsync( + entry.getValue().getAttributeName(), + new RemoteCustomMetadata( + entry.getValue().getUploadedFilename(), + entry.getKey(), + clusterUUID, + blobStoreRepository.getCompressor(), + namedWriteableRegistry + ), + listener ); } if (readCoordinationMetadata) { - asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - new RemoteCoordinationMetadata( - manifest.getCoordinationMetadata().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - COORDINATION_METADATA, - listener - ) + remoteGlobalMetadataManager.readAsync( + COORDINATION_METADATA, + new RemoteCoordinationMetadata( + manifest.getCoordinationMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (readSettingsMetadata) { - asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - new RemotePersistentSettingsMetadata( - manifest.getSettingsMetadata().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - SETTING_METADATA, - listener - ) + remoteGlobalMetadataManager.readAsync( + SETTING_METADATA, + new RemotePersistentSettingsMetadata( + manifest.getSettingsMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (readTransientSettingsMetadata) { - asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - new RemoteTransientSettingsMetadata( - manifest.getTransientSettingsMetadata().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - TRANSIENT_SETTING_METADATA, - listener - ) + remoteGlobalMetadataManager.readAsync( + TRANSIENT_SETTING_METADATA, + new RemoteTransientSettingsMetadata( + manifest.getTransientSettingsMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (readTemplatesMetadata) { - asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - new RemoteTemplatesMetadata( - manifest.getTemplatesMetadata().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - TEMPLATES_METADATA, - listener - ) + remoteGlobalMetadataManager.readAsync( + TEMPLATES_METADATA, + new RemoteTemplatesMetadata( + manifest.getTemplatesMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (readDiscoveryNodes) { - asyncMetadataReadActions.add( - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( - DISCOVERY_NODES, - new RemoteDiscoveryNodes( - manifest.getDiscoveryNodesMetadata().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor() - ), - listener - ) + remoteClusterStateAttributesManager.readAsync( + DISCOVERY_NODES, + new RemoteDiscoveryNodes( + manifest.getDiscoveryNodesMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor() + ), + listener ); } if (readClusterBlocks) { - asyncMetadataReadActions.add( - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( - CLUSTER_BLOCKS, - new RemoteClusterBlocks( - manifest.getClusterBlocksMetadata().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor() - ), - listener - ) + remoteClusterStateAttributesManager.readAsync( + CLUSTER_BLOCKS, + new RemoteClusterBlocks( + manifest.getClusterBlocksMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor() + ), + listener ); } if (readHashesOfConsistentSettings) { - asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - new RemoteHashesOfConsistentSettings( - manifest.getHashesOfConsistentSettings().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor() - ), - HASHES_OF_CONSISTENT_SETTINGS, - listener - ) + remoteGlobalMetadataManager.readAsync( + HASHES_OF_CONSISTENT_SETTINGS, + new RemoteHashesOfConsistentSettings( + manifest.getHashesOfConsistentSettings().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor() + ), + listener ); } for (Map.Entry entry : clusterStateCustomToRead.entrySet()) { - asyncMetadataReadActions.add( - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( - // pass component name as cluster-state-custom--, so that we can interpret it later - String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, entry.getKey()), - new RemoteClusterStateCustoms( - entry.getValue().getUploadedFilename(), - entry.getValue().getAttributeName(), - clusterUUID, - blobStoreRepository.getCompressor(), - namedWriteableRegistry - ), - listener - ) + remoteClusterStateAttributesManager.readAsync( + // pass component name as cluster-state-custom--, so that we can interpret it later + String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, entry.getKey()), + new RemoteClusterStateCustoms( + entry.getValue().getUploadedFilename(), + entry.getValue().getAttributeName(), + clusterUUID, + blobStoreRepository.getCompressor(), + namedWriteableRegistry + ), + listener ); } - for (CheckedRunnable asyncMetadataReadAction : asyncMetadataReadActions) { - asyncMetadataReadAction.run(); - } - try { if (latch.await(this.remoteStateReadTimeout.getMillis(), TimeUnit.MILLISECONDS) == false) { RemoteStateTransferException exception = new RemoteStateTransferException( diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java index 2c5aad99adc0c..5a6f4b7e9f1f1 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java @@ -8,7 +8,6 @@ package org.opensearch.gateway.remote; -import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer; @@ -17,9 +16,8 @@ import org.opensearch.cluster.metadata.Metadata.Custom; import org.opensearch.cluster.metadata.Metadata.XContentContext; import org.opensearch.cluster.metadata.TemplatesMetadata; -import org.opensearch.common.CheckedRunnable; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; -import org.opensearch.common.remote.RemoteWritableEntityStore; +import org.opensearch.common.remote.AbstractRemoteWritableEntityManager; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; @@ -43,7 +41,6 @@ import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Map.Entry; @@ -56,7 +53,7 @@ * * @opensearch.internal */ -public class RemoteGlobalMetadataManager { +public class RemoteGlobalMetadataManager extends AbstractRemoteWritableEntityManager { public static final TimeValue GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); @@ -70,7 +67,6 @@ public class RemoteGlobalMetadataManager { public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1; private volatile TimeValue globalMetadataUploadTimeout; - private Map remoteWritableEntityStores; private final Compressor compressor; private final NamedXContentRegistry namedXContentRegistry; private final NamedWriteableRegistry namedWriteableRegistry; @@ -87,7 +83,6 @@ public class RemoteGlobalMetadataManager { this.compressor = blobStoreRepository.getCompressor(); this.namedXContentRegistry = blobStoreRepository.getNamedXContentRegistry(); this.namedWriteableRegistry = namedWriteableRegistry; - this.remoteWritableEntityStores = new HashMap<>(); this.remoteWritableEntityStores.put( RemoteGlobalMetadata.GLOBAL_METADATA, new RemoteClusterStateBlobStore<>( @@ -161,46 +156,28 @@ public class RemoteGlobalMetadataManager { clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); } - /** - * Allows async upload of Metadata components to remote - */ - CheckedRunnable getAsyncMetadataWriteAction( - AbstractRemoteWritableBlobEntity writeEntity, - LatchedActionListener latchedActionListener - ) { - return (() -> getStore(writeEntity).writeAsync(writeEntity, getActionListener(writeEntity, latchedActionListener))); - } - - private RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) { - RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType()); - if (remoteStore == null) { - throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]"); - } - return remoteStore; - } - - private ActionListener getActionListener( - AbstractRemoteWritableBlobEntity remoteBlobStoreObject, - LatchedActionListener latchedActionListener + @Override + protected ActionListener getWrappedWriteListener( + String component, + AbstractRemoteWritableBlobEntity remoteEntity, + ActionListener listener ) { return ActionListener.wrap( - resp -> latchedActionListener.onResponse(remoteBlobStoreObject.getUploadedMetadata()), - ex -> latchedActionListener.onFailure( - new RemoteStateTransferException("Upload failed for " + remoteBlobStoreObject.getType(), ex) - ) + resp -> listener.onResponse(remoteEntity.getUploadedMetadata()), + ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteEntity, ex)) ); } - CheckedRunnable getAsyncMetadataReadAction( - AbstractRemoteWritableBlobEntity readEntity, - String componentName, - LatchedActionListener listener + @Override + protected ActionListener getWrappedReadListener( + String component, + AbstractRemoteWritableBlobEntity remoteEntity, + ActionListener listener ) { - ActionListener actionListener = ActionListener.wrap( - response -> listener.onResponse(new RemoteReadResult(response, readEntity.getType(), componentName)), - listener::onFailure + return ActionListener.wrap( + response -> listener.onResponse(new RemoteReadResult(response, remoteEntity.getType(), component)), + ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteEntity, ex)) ); - return () -> getStore(readEntity).readAsync(readEntity, actionListener); } Metadata getGlobalMetadata(String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java index c595f19279354..c30721c8f625c 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java @@ -8,10 +8,9 @@ package org.opensearch.gateway.remote; -import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.CheckedRunnable; -import org.opensearch.common.remote.RemoteWritableEntityStore; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.AbstractRemoteWritableEntityManager; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.TimeValue; @@ -33,7 +32,7 @@ * * @opensearch.internal */ -public class RemoteIndexMetadataManager { +public class RemoteIndexMetadataManager extends AbstractRemoteWritableEntityManager { public static final TimeValue INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); @@ -45,7 +44,6 @@ public class RemoteIndexMetadataManager { Setting.Property.Deprecated ); - private final RemoteWritableEntityStore indexMetadataBlobStore; private final Compressor compressor; private final NamedXContentRegistry namedXContentRegistry; @@ -58,12 +56,15 @@ public RemoteIndexMetadataManager( BlobStoreTransferService blobStoreTransferService, ThreadPool threadpool ) { - this.indexMetadataBlobStore = new RemoteClusterStateBlobStore<>( - blobStoreTransferService, - blobStoreRepository, - clusterName, - threadpool, - ThreadPool.Names.REMOTE_STATE_READ + this.remoteWritableEntityStores.put( + RemoteIndexMetadata.INDEX, + new RemoteClusterStateBlobStore<>( + blobStoreTransferService, + blobStoreRepository, + clusterName, + threadpool, + ThreadPool.Names.REMOTE_STATE_READ + ) ); this.namedXContentRegistry = blobStoreRepository.getNamedXContentRegistry(); this.compressor = blobStoreRepository.getCompressor(); @@ -71,45 +72,6 @@ public RemoteIndexMetadataManager( clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout); } - /** - * Allows async Upload of IndexMetadata to remote - * - * @param indexMetadata {@link IndexMetadata} to upload - * @param latchedActionListener listener to respond back on after upload finishes - */ - CheckedRunnable getAsyncIndexMetadataWriteAction( - IndexMetadata indexMetadata, - String clusterUUID, - LatchedActionListener latchedActionListener - ) { - RemoteIndexMetadata remoteIndexMetadata = new RemoteIndexMetadata(indexMetadata, clusterUUID, compressor, namedXContentRegistry); - ActionListener completionListener = ActionListener.wrap( - resp -> latchedActionListener.onResponse(remoteIndexMetadata.getUploadedMetadata()), - ex -> latchedActionListener.onFailure(new RemoteStateTransferException(indexMetadata.getIndex().getName(), ex)) - ); - return () -> indexMetadataBlobStore.writeAsync(remoteIndexMetadata, completionListener); - } - - CheckedRunnable getAsyncIndexMetadataReadAction( - String clusterUUID, - String uploadedFilename, - LatchedActionListener latchedActionListener - ) { - RemoteIndexMetadata remoteIndexMetadata = new RemoteIndexMetadata( - RemoteClusterStateUtils.getFormattedIndexFileName(uploadedFilename), - clusterUUID, - compressor, - namedXContentRegistry - ); - ActionListener actionListener = ActionListener.wrap( - response -> latchedActionListener.onResponse( - new RemoteReadResult(response, RemoteIndexMetadata.INDEX, response.getIndex().getName()) - ), - latchedActionListener::onFailure - ); - return () -> indexMetadataBlobStore.readAsync(remoteIndexMetadata, actionListener); - } - /** * Fetch index metadata from remote cluster state * @@ -124,7 +86,7 @@ IndexMetadata getIndexMetadata(ClusterMetadataManifest.UploadedIndexMetadata upl namedXContentRegistry ); try { - return indexMetadataBlobStore.read(remoteIndexMetadata); + return (IndexMetadata) getStore(remoteIndexMetadata).read(remoteIndexMetadata); } catch (IOException e) { throw new IllegalStateException( String.format(Locale.ROOT, "Error while downloading IndexMetadata - %s", uploadedIndexMetadata.getUploadedFilename()), @@ -141,4 +103,27 @@ private void setIndexMetadataUploadTimeout(TimeValue newIndexMetadataUploadTimeo this.indexMetadataUploadTimeout = newIndexMetadataUploadTimeout; } + @Override + protected ActionListener getWrappedWriteListener( + String component, + AbstractRemoteWritableBlobEntity remoteEntity, + ActionListener listener + ) { + return ActionListener.wrap( + resp -> listener.onResponse(remoteEntity.getUploadedMetadata()), + ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteEntity, ex)) + ); + } + + @Override + protected ActionListener getWrappedReadListener( + String component, + AbstractRemoteWritableBlobEntity remoteEntity, + ActionListener listener + ) { + return ActionListener.wrap( + response -> listener.onResponse(new RemoteReadResult(response, RemoteIndexMetadata.INDEX, component)), + ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteEntity, ex)) + ); + } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 860644ab732cd..0136a025ea64b 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -19,7 +19,6 @@ import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.CheckedRunnable; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; @@ -31,6 +30,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.common.util.TestCapturingListener; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.index.Index; @@ -57,6 +57,7 @@ import java.util.Locale; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.function.Supplier; import org.mockito.ArgumentCaptor; @@ -347,23 +348,23 @@ public void testGetIndicesRoutingMapDiffIndexDeleted() { assertEquals(indexName, diff.getDeletes().get(0)); } - public void testGetIndexRoutingAsyncAction() throws IOException { + public void testGetIndexRoutingAsyncAction() throws IOException, InterruptedException { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); BlobPath expectedPath = getPath(); - LatchedActionListener listener = mock(LatchedActionListener.class); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( + remoteRoutingTableService.getIndexRoutingAsyncAction( clusterState, clusterState.routingTable().getIndicesRouting().get(indexName), - listener, + new LatchedActionListener<>(listener, latch), basePath ); - assertNotNull(runnable); - runnable.run(); + latch.await(); String expectedFilePrefix = String.join( DELIMITER, @@ -372,27 +373,32 @@ public void testGetIndexRoutingAsyncAction() throws IOException { RemoteStoreUtils.invertLong(clusterState.version()) ); verify(blobContainer, times(1)).writeBlob(startsWith(expectedFilePrefix), any(StreamInput.class), anyLong(), eq(true)); - verify(listener, times(1)).onResponse(any(ClusterMetadataManifest.UploadedMetadata.class)); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertTrue(listener.getResult() instanceof ClusterMetadataManifest.UploadedIndexMetadata); + ClusterMetadataManifest.UploadedIndexMetadata metadata = (ClusterMetadataManifest.UploadedIndexMetadata) listener.getResult(); + assertEquals(indexName, metadata.getIndexName()); } - public void testGetIndexRoutingAsyncActionFailureInBlobRepo() throws IOException { + public void testGetIndexRoutingAsyncActionFailureInBlobRepo() throws IOException, InterruptedException { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); BlobPath expectedPath = getPath(); - LatchedActionListener listener = mock(LatchedActionListener.class); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); - doThrow(new IOException("testing failure")).when(blobContainer).writeBlob(anyString(), any(StreamInput.class), anyLong(), eq(true)); + IOException exception = new IOException("testing failure"); + doThrow(exception).when(blobContainer).writeBlob(anyString(), any(StreamInput.class), anyLong(), eq(true)); remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( + remoteRoutingTableService.getIndexRoutingAsyncAction( clusterState, clusterState.routingTable().getIndicesRouting().get(indexName), - listener, + new LatchedActionListener<>(listener, latch), basePath ); - assertNotNull(runnable); - runnable.run(); + latch.await(); String expectedFilePrefix = String.join( DELIMITER, INDEX_ROUTING_FILE_PREFIX, @@ -400,15 +406,19 @@ public void testGetIndexRoutingAsyncActionFailureInBlobRepo() throws IOException RemoteStoreUtils.invertLong(clusterState.version()) ); verify(blobContainer, times(1)).writeBlob(startsWith(expectedFilePrefix), any(StreamInput.class), anyLong(), eq(true)); - verify(listener, times(1)).onFailure(any(RemoteStateTransferException.class)); + assertNotNull(listener.getFailure()); + assertNull(listener.getResult()); + assertTrue(listener.getFailure() instanceof RemoteStateTransferException); + assertEquals(exception, listener.getFailure().getCause()); } - public void testGetIndexRoutingAsyncActionAsyncRepo() throws IOException { + public void testGetIndexRoutingAsyncActionAsyncRepo() throws IOException, InterruptedException { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); BlobPath expectedPath = getPath(); - LatchedActionListener listener = mock(LatchedActionListener.class); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); blobContainer = mock(AsyncMultiStreamBlobContainer.class); when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); ArgumentCaptor> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class); @@ -424,14 +434,13 @@ public void testGetIndexRoutingAsyncActionAsyncRepo() throws IOException { .asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture()); remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( + remoteRoutingTableService.getIndexRoutingAsyncAction( clusterState, clusterState.routingTable().getIndicesRouting().get(indexName), - listener, + new LatchedActionListener<>(listener, latch), basePath ); - assertNotNull(runnable); - runnable.run(); + latch.await(); String expectedFilePrefix = String.join( DELIMITER, @@ -439,35 +448,39 @@ public void testGetIndexRoutingAsyncActionAsyncRepo() throws IOException { RemoteStoreUtils.invertLong(clusterState.term()), RemoteStoreUtils.invertLong(clusterState.version()) ); - assertEquals(1, actionListenerArgumentCaptor.getAllValues().size()); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); assertEquals(1, writeContextArgumentCaptor.getAllValues().size()); assertNotNull(capturedWriteContext.get("index_routing")); assertEquals(capturedWriteContext.get("index_routing").getWritePriority(), WritePriority.URGENT); assertTrue(capturedWriteContext.get("index_routing").getFileName().startsWith(expectedFilePrefix)); } - public void testGetIndexRoutingAsyncActionAsyncRepoFailureInRepo() throws IOException { + public void testGetIndexRoutingAsyncActionAsyncRepoFailureInRepo() throws IOException, InterruptedException { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); BlobPath expectedPath = getPath(); - LatchedActionListener listener = mock(LatchedActionListener.class); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); blobContainer = mock(AsyncMultiStreamBlobContainer.class); when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); - - doThrow(new IOException("Testing failure")).when((AsyncMultiStreamBlobContainer) blobContainer) + IOException exception = new IOException("Testing failure"); + doThrow(exception).when((AsyncMultiStreamBlobContainer) blobContainer) .asyncBlobUpload(any(WriteContext.class), any(ActionListener.class)); remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( + remoteRoutingTableService.getIndexRoutingAsyncAction( clusterState, clusterState.routingTable().getIndicesRouting().get(indexName), - listener, + new LatchedActionListener<>(listener, latch), basePath ); - assertNotNull(runnable); - runnable.run(); - verify(listener, times(1)).onFailure(any(RemoteStateTransferException.class)); + latch.await(); + assertNull(listener.getResult()); + assertNotNull(listener.getFailure()); + assertTrue(listener.getFailure() instanceof RemoteStateTransferException); + assertEquals(exception, listener.getFailure().getCause()); } public void testGetAllUploadedIndicesRouting() { @@ -646,7 +659,8 @@ public void testGetAsyncIndexMetadataReadAction() throws Exception { String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); Index index = new Index(indexName, "uuid-01"); - LatchedActionListener listener = mock(LatchedActionListener.class); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); when(blobStore.blobContainer(any())).thenReturn(blobContainer); BytesStreamOutput streamOutput = new BytesStreamOutput(); RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( @@ -656,12 +670,13 @@ public void testGetAsyncIndexMetadataReadAction() throws Exception { when(blobContainer.readBlob(indexName)).thenReturn(streamOutput.bytes().streamInput()); remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); - assertNotNull(runnable); - runnable.run(); + remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, new LatchedActionListener<>(listener, latch)); + latch.await(); - assertBusy(() -> verify(blobContainer, times(1)).readBlob(any())); - assertBusy(() -> verify(listener, times(1)).onResponse(any(IndexRoutingTable.class))); + verify(blobContainer, times(1)).readBlob(any()); + assertNotNull(listener.getResult()); + assertNull(listener.getFailure()); + assertTrue(listener.getResult() instanceof IndexRoutingTable); } public void testGetAsyncIndexMetadataReadActionFailureForIncorrectIndex() throws Exception { @@ -670,7 +685,8 @@ public void testGetAsyncIndexMetadataReadActionFailureForIncorrectIndex() throws String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); Index index = new Index("incorrect-index", "uuid-01"); - LatchedActionListener listener = mock(LatchedActionListener.class); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); when(blobStore.blobContainer(any())).thenReturn(blobContainer); BytesStreamOutput streamOutput = new BytesStreamOutput(); RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable( @@ -680,12 +696,12 @@ public void testGetAsyncIndexMetadataReadActionFailureForIncorrectIndex() throws when(blobContainer.readBlob(anyString())).thenReturn(streamOutput.bytes().streamInput()); remoteRoutingTableService.doStart(); - CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); - assertNotNull(runnable); - runnable.run(); + remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, new LatchedActionListener<>(listener, latch)); + latch.await(); - assertBusy(() -> verify(blobContainer, times(1)).readBlob(any())); - assertBusy(() -> verify(listener, times(1)).onFailure(any(Exception.class))); + verify(blobContainer, times(1)).readBlob(any()); + assertNull(listener.getResult()); + assertNotNull(listener.getFailure()); } public void testGetAsyncIndexMetadataReadActionFailureInBlobRepo() throws Exception { @@ -693,16 +709,20 @@ public void testGetAsyncIndexMetadataReadActionFailureInBlobRepo() throws Except String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); Index index = new Index(indexName, "uuid-01"); - LatchedActionListener listener = mock(LatchedActionListener.class); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); when(blobStore.blobContainer(any())).thenReturn(blobContainer); - doThrow(new IOException("testing failure")).when(blobContainer).readBlob(indexName); + IOException exception = new IOException("testing failure"); + doThrow(exception).when(blobContainer).readBlob(indexName); remoteRoutingTableService.doStart(); - CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); - assertNotNull(runnable); - runnable.run(); + remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, new LatchedActionListener<>(listener, latch)); + latch.await(); - assertBusy(() -> verify(listener, times(1)).onFailure(any(RemoteStateTransferException.class))); + assertNull(listener.getResult()); + assertNotNull(listener.getFailure()); + assertTrue(listener.getFailure() instanceof RemoteStateTransferException); + assertEquals(exception, listener.getFailure().getCause()); } public void testGetUpdatedIndexRoutingTableMetadataWhenNoChange() { diff --git a/server/src/test/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManagerTests.java b/server/src/test/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManagerTests.java new file mode 100644 index 0000000000000..3d10bbf59f6ad --- /dev/null +++ b/server/src/test/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManagerTests.java @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.remote; + +import org.opensearch.core.action.ActionListener; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.model.RemoteReadResult; +import org.opensearch.test.OpenSearchTestCase; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class AbstractRemoteWritableEntityManagerTests extends OpenSearchTestCase { + public void testGetStoreWithKnownEntityType() { + AbstractRemoteWritableEntityManager manager = new ConcreteRemoteWritableEntityManager(); + String knownEntityType = "knownType"; + RemoteWritableEntityStore mockStore = mock(RemoteWritableEntityStore.class); + manager.remoteWritableEntityStores.put(knownEntityType, mockStore); + AbstractRemoteWritableBlobEntity mockEntity = mock(AbstractRemoteWritableBlobEntity.class); + when(mockEntity.getType()).thenReturn(knownEntityType); + + RemoteWritableEntityStore store = manager.getStore(mockEntity); + verify(mockEntity).getType(); + assertEquals(mockStore, store); + } + + public void testGetStoreWithUnknownEntityType() { + AbstractRemoteWritableEntityManager manager = new ConcreteRemoteWritableEntityManager(); + String unknownEntityType = "unknownType"; + AbstractRemoteWritableBlobEntity mockEntity = mock(AbstractRemoteWritableBlobEntity.class); + when(mockEntity.getType()).thenReturn(unknownEntityType); + + assertThrows(IllegalArgumentException.class, () -> manager.getStore(mockEntity)); + verify(mockEntity, times(2)).getType(); + } + + private static class ConcreteRemoteWritableEntityManager extends AbstractRemoteWritableEntityManager { + @Override + protected ActionListener getWrappedWriteListener( + String component, + AbstractRemoteWritableBlobEntity remoteEntity, + ActionListener listener + ) { + return null; + } + + @Override + protected ActionListener getWrappedReadListener( + String component, + AbstractRemoteWritableBlobEntity remoteEntity, + ActionListener listener + ) { + return null; + } + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java index 3f2edd1a6c5a5..4ef459e6657a1 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java @@ -107,7 +107,7 @@ public void tearDown() throws Exception { threadPool.shutdown(); } - public void testGetAsyncMetadataWriteAction_DiscoveryNodes() throws IOException, InterruptedException { + public void testGetAsyncWriteRunnable_DiscoveryNodes() throws IOException, InterruptedException { DiscoveryNodes discoveryNodes = getDiscoveryNodes(); RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(discoveryNodes, VERSION, CLUSTER_UUID, compressor); doAnswer(invocationOnMock -> { @@ -117,11 +117,7 @@ public void testGetAsyncMetadataWriteAction_DiscoveryNodes() throws IOException, .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); final CountDownLatch latch = new CountDownLatch(1); final TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( - DISCOVERY_NODES, - remoteDiscoveryNodes, - new LatchedActionListener<>(listener, latch) - ).run(); + remoteClusterStateAttributesManager.writeAsync(DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -140,7 +136,7 @@ public void testGetAsyncMetadataWriteAction_DiscoveryNodes() throws IOException, assertEquals(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetAsyncMetadataReadAction_DiscoveryNodes() throws IOException, InterruptedException { + public void testGetAsyncReadRunnable_DiscoveryNodes() throws IOException, InterruptedException { DiscoveryNodes discoveryNodes = getDiscoveryNodes(); String fileName = randomAlphaOfLength(10); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( @@ -149,11 +145,7 @@ public void testGetAsyncMetadataReadAction_DiscoveryNodes() throws IOException, RemoteDiscoveryNodes remoteObjForDownload = new RemoteDiscoveryNodes(fileName, "cluster-uuid", compressor); CountDownLatch latch = new CountDownLatch(1); TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( - DISCOVERY_NODES, - remoteObjForDownload, - new LatchedActionListener<>(listener, latch) - ).run(); + remoteClusterStateAttributesManager.readAsync(DISCOVERY_NODES, remoteObjForDownload, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -165,7 +157,7 @@ public void testGetAsyncMetadataReadAction_DiscoveryNodes() throws IOException, assertEquals(discoveryNodes.getClusterManagerNodeId(), readDiscoveryNodes.getClusterManagerNodeId()); } - public void testGetAsyncMetadataWriteAction_ClusterBlocks() throws IOException, InterruptedException { + public void testGetAsyncWriteRunnable_ClusterBlocks() throws IOException, InterruptedException { ClusterBlocks clusterBlocks = randomClusterBlocks(); RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(clusterBlocks, VERSION, CLUSTER_UUID, compressor); doAnswer(invocationOnMock -> { @@ -175,11 +167,7 @@ public void testGetAsyncMetadataWriteAction_ClusterBlocks() throws IOException, .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); final CountDownLatch latch = new CountDownLatch(1); final TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( - CLUSTER_BLOCKS, - remoteClusterBlocks, - new LatchedActionListener<>(listener, latch) - ).run(); + remoteClusterStateAttributesManager.writeAsync(CLUSTER_BLOCKS, remoteClusterBlocks, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -198,7 +186,7 @@ public void testGetAsyncMetadataWriteAction_ClusterBlocks() throws IOException, assertEquals(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetAsyncMetadataReadAction_ClusterBlocks() throws IOException, InterruptedException { + public void testGetAsyncReadRunnable_ClusterBlocks() throws IOException, InterruptedException { ClusterBlocks clusterBlocks = randomClusterBlocks(); String fileName = randomAlphaOfLength(10); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( @@ -208,11 +196,7 @@ public void testGetAsyncMetadataReadAction_ClusterBlocks() throws IOException, I CountDownLatch latch = new CountDownLatch(1); TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( - CLUSTER_BLOCKS, - remoteClusterBlocks, - new LatchedActionListener<>(listener, latch) - ).run(); + remoteClusterStateAttributesManager.readAsync(CLUSTER_BLOCKS, remoteClusterBlocks, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -226,7 +210,7 @@ public void testGetAsyncMetadataReadAction_ClusterBlocks() throws IOException, I } } - public void testGetAsyncMetadataWriteAction_Custom() throws IOException, InterruptedException { + public void testGetAsyncWriteRunnable_Custom() throws IOException, InterruptedException { Custom custom = getClusterStateCustom(); RemoteClusterStateCustoms remoteClusterStateCustoms = new RemoteClusterStateCustoms( custom, @@ -243,11 +227,11 @@ public void testGetAsyncMetadataWriteAction_Custom() throws IOException, Interru .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); final TestCapturingListener listener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); - remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( + remoteClusterStateAttributesManager.writeAsync( CLUSTER_STATE_CUSTOM, remoteClusterStateCustoms, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -266,7 +250,7 @@ public void testGetAsyncMetadataWriteAction_Custom() throws IOException, Interru assertEquals(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetAsyncMetadataReadAction_Custom() throws IOException, InterruptedException { + public void testGetAsyncReadRunnable_Custom() throws IOException, InterruptedException { Custom custom = getClusterStateCustom(); String fileName = randomAlphaOfLength(10); RemoteClusterStateCustoms remoteClusterStateCustoms = new RemoteClusterStateCustoms( @@ -281,11 +265,11 @@ public void testGetAsyncMetadataReadAction_Custom() throws IOException, Interrup ); TestCapturingListener capturingListener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + remoteClusterStateAttributesManager.readAsync( CLUSTER_STATE_CUSTOM, remoteClusterStateCustoms, new LatchedActionListener<>(capturingListener, latch) - ).run(); + ); latch.await(); assertNull(capturingListener.getFailure()); assertNotNull(capturingListener.getResult()); @@ -294,7 +278,7 @@ public void testGetAsyncMetadataReadAction_Custom() throws IOException, Interrup assertEquals(CLUSTER_STATE_CUSTOM, capturingListener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_Exception() throws IOException, InterruptedException { + public void testGetAsyncWriteRunnable_Exception() throws IOException, InterruptedException { DiscoveryNodes discoveryNodes = getDiscoveryNodes(); RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(discoveryNodes, VERSION, CLUSTER_UUID, compressor); @@ -307,32 +291,33 @@ public void testGetAsyncMetadataWriteAction_Exception() throws IOException, Inte TestCapturingListener capturingListener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); - remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( + remoteClusterStateAttributesManager.writeAsync( DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(capturingListener, latch) - ).run(); + ); latch.await(); assertNull(capturingListener.getResult()); assertTrue(capturingListener.getFailure() instanceof RemoteStateTransferException); assertEquals(ioException, capturingListener.getFailure().getCause()); } - public void testGetAsyncMetadataReadAction_Exception() throws IOException, InterruptedException { + public void testGetAsyncReadRunnable_Exception() throws IOException, InterruptedException { String fileName = randomAlphaOfLength(10); RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(fileName, CLUSTER_UUID, compressor); Exception ioException = new IOException("mock test exception"); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenThrow(ioException); CountDownLatch latch = new CountDownLatch(1); TestCapturingListener capturingListener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + remoteClusterStateAttributesManager.readAsync( DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(capturingListener, latch) - ).run(); + ); latch.await(); assertNull(capturingListener.getResult()); - assertEquals(ioException, capturingListener.getFailure()); + assertEquals(ioException, capturingListener.getFailure().getCause()); + assertTrue(capturingListener.getFailure() instanceof RemoteStateTransferException); } public void testGetUpdatedCustoms() { diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 6cd9cbbf13848..0650ce5e6ca96 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -120,6 +120,7 @@ import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.FORMAT_PARAMS; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getFormattedIndexFileName; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT; import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS_FORMAT; import static org.opensearch.gateway.remote.model.RemoteClusterBlocksTests.randomClusterBlocks; import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; @@ -589,6 +590,55 @@ public void testFailWriteIncrementalMetadataWhenTermChanged() { ); } + public void testWriteMetadataInParallelIncompleteUpload() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + final RemoteClusterStateService rcssSpy = Mockito.spy(remoteClusterStateService); + rcssSpy.start(); + RemoteIndexMetadataManager mockedIndexManager = mock(RemoteIndexMetadataManager.class); + RemoteGlobalMetadataManager mockedGlobalMetadataManager = mock(RemoteGlobalMetadataManager.class); + RemoteClusterStateAttributesManager mockedClusterStateAttributeManager = mock(RemoteClusterStateAttributesManager.class); + ClusterMetadataManifest.UploadedMetadata mockedUploadedMetadata = mock(ClusterMetadataManifest.UploadedMetadata.class); + rcssSpy.setRemoteIndexMetadataManager(mockedIndexManager); + rcssSpy.setRemoteGlobalMetadataManager(mockedGlobalMetadataManager); + rcssSpy.setRemoteClusterStateAttributesManager(mockedClusterStateAttributeManager); + ArgumentCaptor listenerArgumentCaptor = ArgumentCaptor.forClass(LatchedActionListener.class); + + when(mockedGlobalMetadataManager.getGlobalMetadataUploadTimeout()).thenReturn(GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT); + when(mockedUploadedMetadata.getComponent()).thenReturn("test-component"); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedUploadedMetadata); + return null; + }).when(mockedIndexManager).writeAsync(any(), any(), listenerArgumentCaptor.capture()); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedUploadedMetadata); + return null; + }).when(mockedGlobalMetadataManager).writeAsync(anyString(), any(), listenerArgumentCaptor.capture()); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedUploadedMetadata); + return null; + }).when(mockedClusterStateAttributeManager).writeAsync(any(), any(), listenerArgumentCaptor.capture()); + + RemoteStateTransferException exception = expectThrows( + RemoteStateTransferException.class, + () -> rcssSpy.writeMetadataInParallel( + clusterState, + new ArrayList<>(clusterState.getMetadata().indices().values()), + emptyMap(), + clusterState.getMetadata().customs(), + true, + true, + true, + true, + true, + true, + clusterState.getCustoms(), + true, + emptyList() + ) + ); + assertTrue(exception.getMessage().startsWith("Some metadata components were not uploaded successfully")); + } + public void testWriteIncrementalMetadataSuccess() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); @@ -780,14 +830,18 @@ public void testGetClusterStateForManifest_IncludeEphemeral() throws IOException ArgumentCaptor> listenerArgumentCaptor = ArgumentCaptor.forClass( LatchedActionListener.class ); - when(mockedIndexManager.getAsyncIndexMetadataReadAction(any(), anyString(), listenerArgumentCaptor.capture())).thenReturn( - () -> listenerArgumentCaptor.getValue().onResponse(mockedResult) - ); - when(mockedGlobalMetadataManager.getAsyncMetadataReadAction(any(), anyString(), listenerArgumentCaptor.capture())).thenReturn( - () -> listenerArgumentCaptor.getValue().onResponse(mockedResult) - ); - when(mockedClusterStateAttributeManager.getAsyncMetadataReadAction(anyString(), any(), listenerArgumentCaptor.capture())) - .thenReturn(() -> listenerArgumentCaptor.getValue().onResponse(mockedResult)); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedResult); + return null; + }).when(mockedIndexManager).readAsync(any(), any(), listenerArgumentCaptor.capture()); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedResult); + return null; + }).when(mockedGlobalMetadataManager).readAsync(any(), any(), listenerArgumentCaptor.capture()); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedResult); + return null; + }).when(mockedClusterStateAttributeManager).readAsync(anyString(), any(), listenerArgumentCaptor.capture()); when(mockedResult.getComponent()).thenReturn(COORDINATION_METADATA); RemoteClusterStateService mockService = spy(remoteClusterStateService); mockService.getClusterStateForManifest(ClusterName.DEFAULT.value(), manifest, NODE_ID, true); @@ -822,14 +876,18 @@ public void testGetClusterStateForManifest_ExcludeEphemeral() throws IOException ArgumentCaptor> listenerArgumentCaptor = ArgumentCaptor.forClass( LatchedActionListener.class ); - when(mockedIndexManager.getAsyncIndexMetadataReadAction(any(), anyString(), listenerArgumentCaptor.capture())).thenReturn( - () -> listenerArgumentCaptor.getValue().onResponse(mockedResult) - ); - when(mockedGlobalMetadataManager.getAsyncMetadataReadAction(any(), anyString(), listenerArgumentCaptor.capture())).thenReturn( - () -> listenerArgumentCaptor.getValue().onResponse(mockedResult) - ); - when(mockedClusterStateAttributeManager.getAsyncMetadataReadAction(anyString(), any(), listenerArgumentCaptor.capture())) - .thenReturn(() -> listenerArgumentCaptor.getValue().onResponse(mockedResult)); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedResult); + return null; + }).when(mockedIndexManager).readAsync(anyString(), any(), listenerArgumentCaptor.capture()); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedResult); + return null; + }).when(mockedGlobalMetadataManager).readAsync(anyString(), any(), listenerArgumentCaptor.capture()); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedResult); + return null; + }).when(mockedClusterStateAttributeManager).readAsync(anyString(), any(), listenerArgumentCaptor.capture()); when(mockedResult.getComponent()).thenReturn(COORDINATION_METADATA); remoteClusterStateService.setRemoteIndexMetadataManager(mockedIndexManager); remoteClusterStateService.setRemoteGlobalMetadataManager(mockedGlobalMetadataManager); @@ -876,9 +934,10 @@ public void testGetClusterStateFromManifest_CodecV1() throws IOException { ArgumentCaptor> listenerArgumentCaptor = ArgumentCaptor.forClass( LatchedActionListener.class ); - when(mockedIndexManager.getAsyncIndexMetadataReadAction(any(), anyString(), listenerArgumentCaptor.capture())).thenReturn( - () -> listenerArgumentCaptor.getValue().onResponse(new RemoteReadResult(indexMetadata, INDEX, INDEX)) - ); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(new RemoteReadResult(indexMetadata, INDEX, INDEX)); + return null; + }).when(mockedIndexManager).readAsync(anyString(), any(), listenerArgumentCaptor.capture()); when(mockedGlobalMetadataManager.getGlobalMetadata(anyString(), eq(manifest))).thenReturn(Metadata.EMPTY_METADATA); RemoteClusterStateService spiedService = spy(remoteClusterStateService); spiedService.getClusterStateForManifest(ClusterName.DEFAULT.value(), manifest, NODE_ID, true); @@ -1257,7 +1316,7 @@ public void testReadClusterStateInParallel_ExceptionDuringRead() throws IOExcept ); assertEquals("Exception during reading cluster state from remote", exception.getMessage()); assertTrue(exception.getSuppressed().length > 0); - assertEquals(mockException, exception.getSuppressed()[0]); + assertEquals(mockException, exception.getSuppressed()[0].getCause()); } public void testReadClusterStateInParallel_UnexpectedResult() throws IOException { @@ -1321,19 +1380,20 @@ public void testReadClusterStateInParallel_UnexpectedResult() throws IOException RemoteIndexMetadataManager mockIndexMetadataManager = mock(RemoteIndexMetadataManager.class); CheckedRunnable mockRunnable = mock(CheckedRunnable.class); ArgumentCaptor> latchCapture = ArgumentCaptor.forClass(LatchedActionListener.class); - when(mockIndexMetadataManager.getAsyncIndexMetadataReadAction(anyString(), anyString(), latchCapture.capture())).thenReturn( - mockRunnable - ); + doAnswer(invocation -> { + latchCapture.getValue().onResponse(mockResult); + return null; + }).when(mockIndexMetadataManager).readAsync(anyString(), any(), latchCapture.capture()); RemoteGlobalMetadataManager mockGlobalMetadataManager = mock(RemoteGlobalMetadataManager.class); - when(mockGlobalMetadataManager.getAsyncMetadataReadAction(any(), anyString(), latchCapture.capture())).thenReturn(mockRunnable); + doAnswer(invocation -> { + latchCapture.getValue().onResponse(mockResult); + return null; + }).when(mockGlobalMetadataManager).readAsync(any(), any(), latchCapture.capture()); RemoteClusterStateAttributesManager mockClusterStateAttributeManager = mock(RemoteClusterStateAttributesManager.class); - when(mockClusterStateAttributeManager.getAsyncMetadataReadAction(anyString(), any(), latchCapture.capture())).thenReturn( - mockRunnable - ); - doAnswer(invocationOnMock -> { + doAnswer(invocation -> { latchCapture.getValue().onResponse(mockResult); return null; - }).when(mockRunnable).run(); + }).when(mockClusterStateAttributeManager).readAsync(anyString(), any(), latchCapture.capture()); when(mockResult.getComponent()).thenReturn("mock-result"); remoteClusterStateService.start(); remoteClusterStateService.setRemoteIndexMetadataManager(mockIndexMetadataManager); @@ -1362,56 +1422,56 @@ public void testReadClusterStateInParallel_UnexpectedResult() throws IOException ); assertEquals("Unknown component: mock-result", exception.getMessage()); newIndicesToRead.forEach( - uploadedIndexMetadata -> verify(mockIndexMetadataManager, times(1)).getAsyncIndexMetadataReadAction( - eq(previousClusterState.getMetadata().clusterUUID()), - eq(uploadedIndexMetadata.getUploadedFilename()), + uploadedIndexMetadata -> verify(mockIndexMetadataManager, times(1)).readAsync( + eq("test-index-1"), + argThat(new BlobNameMatcher(uploadedIndexMetadata.getUploadedFilename())), any() ) ); - verify(mockGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(COORDINATION_METADATA_FILENAME)), + verify(mockGlobalMetadataManager, times(1)).readAsync( eq(COORDINATION_METADATA), + argThat(new BlobNameMatcher(COORDINATION_METADATA_FILENAME)), any() ); - verify(mockGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(PERSISTENT_SETTINGS_FILENAME)), + verify(mockGlobalMetadataManager, times(1)).readAsync( eq(SETTING_METADATA), + argThat(new BlobNameMatcher(PERSISTENT_SETTINGS_FILENAME)), any() ); - verify(mockGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(TRANSIENT_SETTINGS_FILENAME)), + verify(mockGlobalMetadataManager, times(1)).readAsync( eq(TRANSIENT_SETTING_METADATA), + argThat(new BlobNameMatcher(TRANSIENT_SETTINGS_FILENAME)), any() ); - verify(mockGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(TEMPLATES_METADATA_FILENAME)), + verify(mockGlobalMetadataManager, times(1)).readAsync( eq(TEMPLATES_METADATA), + argThat(new BlobNameMatcher(TEMPLATES_METADATA_FILENAME)), any() ); - verify(mockGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(HASHES_OF_CONSISTENT_SETTINGS_FILENAME)), + verify(mockGlobalMetadataManager, times(1)).readAsync( eq(HASHES_OF_CONSISTENT_SETTINGS), + argThat(new BlobNameMatcher(HASHES_OF_CONSISTENT_SETTINGS_FILENAME)), any() ); newCustomMetadataMap.keySet().forEach(uploadedCustomMetadataKey -> { - verify(mockGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(newCustomMetadataMap.get(uploadedCustomMetadataKey).getUploadedFilename())), + verify(mockGlobalMetadataManager, times(1)).readAsync( eq(uploadedCustomMetadataKey), + argThat(new BlobNameMatcher(newCustomMetadataMap.get(uploadedCustomMetadataKey).getUploadedFilename())), any() ); }); - verify(mockClusterStateAttributeManager, times(1)).getAsyncMetadataReadAction( + verify(mockClusterStateAttributeManager, times(1)).readAsync( eq(DISCOVERY_NODES), argThat(new BlobNameMatcher(DISCOVERY_NODES_FILENAME)), any() ); - verify(mockClusterStateAttributeManager, times(1)).getAsyncMetadataReadAction( + verify(mockClusterStateAttributeManager, times(1)).readAsync( eq(CLUSTER_BLOCKS), argThat(new BlobNameMatcher(CLUSTER_BLOCKS_FILENAME)), any() ); newClusterStateCustoms.keySet().forEach(uploadedClusterStateCustomMetadataKey -> { - verify(mockClusterStateAttributeManager, times(1)).getAsyncMetadataReadAction( + verify(mockClusterStateAttributeManager, times(1)).readAsync( eq(String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, uploadedClusterStateCustomMetadataKey)), argThat(new BlobNameMatcher(newClusterStateCustoms.get(uploadedClusterStateCustomMetadataKey).getUploadedFilename())), any() @@ -1494,131 +1554,81 @@ public void testReadClusterStateInParallel_Success() throws IOException { RemoteGlobalMetadataManager mockedGlobalMetadataManager = mock(RemoteGlobalMetadataManager.class); RemoteClusterStateAttributesManager mockedClusterStateAttributeManager = mock(RemoteClusterStateAttributesManager.class); - when( - mockedIndexManager.getAsyncIndexMetadataReadAction( - eq(manifest.getClusterUUID()), - eq(indexFilename), - any(LatchedActionListener.class) - ) - ).thenAnswer(invocationOnMock -> { + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - return (CheckedRunnable) () -> latchedActionListener.onResponse( - new RemoteReadResult(newIndexMetadata, INDEX, "test-index-1") - ); - }); - when( - mockedGlobalMetadataManager.getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(customMetadataFilename)), - eq("custom_md_3"), - any() - ) - ).thenAnswer(invocationOnMock -> { + latchedActionListener.onResponse(new RemoteReadResult(newIndexMetadata, INDEX, "test-index-1")); + return null; + }).when(mockedIndexManager) + .readAsync(eq("test-index-1"), argThat(new BlobNameMatcher(indexFilename)), any(LatchedActionListener.class)); + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - return (CheckedRunnable) () -> latchedActionListener.onResponse( - new RemoteReadResult(customMetadata3, CUSTOM_METADATA, "custom_md_3") - ); - }); - when( - mockedGlobalMetadataManager.getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(COORDINATION_METADATA_FILENAME)), - eq(COORDINATION_METADATA), - any() - ) - ).thenAnswer(invocationOnMock -> { + latchedActionListener.onResponse(new RemoteReadResult(customMetadata3, CUSTOM_METADATA, "custom_md_3")); + return null; + }).when(mockedGlobalMetadataManager).readAsync(eq("custom_md_3"), argThat(new BlobNameMatcher(customMetadataFilename)), any()); + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - return (CheckedRunnable) () -> latchedActionListener.onResponse( + latchedActionListener.onResponse( new RemoteReadResult(updatedCoordinationMetadata, COORDINATION_METADATA, COORDINATION_METADATA) ); - }); - when( - mockedGlobalMetadataManager.getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(PERSISTENT_SETTINGS_FILENAME)), - eq(SETTING_METADATA), - any() - ) - ).thenAnswer(invocationOnMock -> { + return null; + }).when(mockedGlobalMetadataManager) + .readAsync(eq(COORDINATION_METADATA), argThat(new BlobNameMatcher(COORDINATION_METADATA_FILENAME)), any()); + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - - return (CheckedRunnable) () -> latchedActionListener.onResponse( - new RemoteReadResult(updatedPersistentSettings, SETTING_METADATA, SETTING_METADATA) - ); - }); - when( - mockedGlobalMetadataManager.getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(TRANSIENT_SETTINGS_FILENAME)), - eq(TRANSIENT_SETTING_METADATA), - any() - ) - ).thenAnswer(invocationOnMock -> { + latchedActionListener.onResponse(new RemoteReadResult(updatedPersistentSettings, SETTING_METADATA, SETTING_METADATA)); + return null; + }).when(mockedGlobalMetadataManager) + .readAsync(eq(SETTING_METADATA), argThat(new BlobNameMatcher(PERSISTENT_SETTINGS_FILENAME)), any()); + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - return (CheckedRunnable) () -> latchedActionListener.onResponse( + latchedActionListener.onResponse( new RemoteReadResult(updatedTransientSettings, TRANSIENT_SETTING_METADATA, TRANSIENT_SETTING_METADATA) ); - }); - when( - mockedGlobalMetadataManager.getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(TEMPLATES_METADATA_FILENAME)), - eq(TEMPLATES_METADATA), - any() - ) - ).thenAnswer(invocationOnMock -> { + return null; + }).when(mockedGlobalMetadataManager) + .readAsync(eq(TRANSIENT_SETTING_METADATA), argThat(new BlobNameMatcher(TRANSIENT_SETTINGS_FILENAME)), any()); + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - return (CheckedRunnable) () -> latchedActionListener.onResponse( - new RemoteReadResult(updatedTemplateMetadata, TEMPLATES_METADATA, TEMPLATES_METADATA) - ); - }); - when( - mockedGlobalMetadataManager.getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(HASHES_OF_CONSISTENT_SETTINGS_FILENAME)), - eq(HASHES_OF_CONSISTENT_SETTINGS), - any() - ) - ).thenAnswer(invocationOnMock -> { + latchedActionListener.onResponse(new RemoteReadResult(updatedTemplateMetadata, TEMPLATES_METADATA, TEMPLATES_METADATA)); + return null; + }).when(mockedGlobalMetadataManager) + .readAsync(eq(TEMPLATES_METADATA), argThat(new BlobNameMatcher(TEMPLATES_METADATA_FILENAME)), any()); + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - return (CheckedRunnable) () -> latchedActionListener.onResponse( + latchedActionListener.onResponse( new RemoteReadResult(updatedHashesOfConsistentSettings, HASHES_OF_CONSISTENT_SETTINGS, HASHES_OF_CONSISTENT_SETTINGS) ); - }); - when( - mockedClusterStateAttributeManager.getAsyncMetadataReadAction( - eq(DISCOVERY_NODES), - argThat(new BlobNameMatcher(DISCOVERY_NODES_FILENAME)), - any() - ) - ).thenAnswer(invocationOnMock -> { + return null; + }).when(mockedGlobalMetadataManager) + .readAsync(eq(HASHES_OF_CONSISTENT_SETTINGS), argThat(new BlobNameMatcher(HASHES_OF_CONSISTENT_SETTINGS_FILENAME)), any()); + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - return (CheckedRunnable) () -> latchedActionListener.onResponse( - new RemoteReadResult(updatedDiscoveryNodes, CLUSTER_STATE_ATTRIBUTE, DISCOVERY_NODES) - ); - }); - when( - mockedClusterStateAttributeManager.getAsyncMetadataReadAction( - eq(CLUSTER_BLOCKS), - argThat(new BlobNameMatcher(CLUSTER_BLOCKS_FILENAME)), - any() - ) - ).thenAnswer(invocationOnMock -> { + latchedActionListener.onResponse(new RemoteReadResult(updatedDiscoveryNodes, CLUSTER_STATE_ATTRIBUTE, DISCOVERY_NODES)); + return null; + }).when(mockedClusterStateAttributeManager) + .readAsync(eq(DISCOVERY_NODES), argThat(new BlobNameMatcher(DISCOVERY_NODES_FILENAME)), any()); + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - return (CheckedRunnable) () -> latchedActionListener.onResponse( - new RemoteReadResult(updatedClusterBlocks, CLUSTER_STATE_ATTRIBUTE, CLUSTER_BLOCKS) - ); - }); - when( - mockedClusterStateAttributeManager.getAsyncMetadataReadAction( - eq(String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, updatedClusterStateCustom3.getWriteableName())), - argThat(new BlobNameMatcher(clusterStateCustomFilename)), - any() - ) - ).thenAnswer(invocationOnMock -> { + latchedActionListener.onResponse(new RemoteReadResult(updatedClusterBlocks, CLUSTER_STATE_ATTRIBUTE, CLUSTER_BLOCKS)); + return null; + }).when(mockedClusterStateAttributeManager) + .readAsync(eq(CLUSTER_BLOCKS), argThat(new BlobNameMatcher(CLUSTER_BLOCKS_FILENAME)), any()); + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - return (CheckedRunnable) () -> latchedActionListener.onResponse( + latchedActionListener.onResponse( new RemoteReadResult( updatedClusterStateCustom3, CLUSTER_STATE_ATTRIBUTE, String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, updatedClusterStateCustom3.getWriteableName()) ) ); - }); + return null; + }).when(mockedClusterStateAttributeManager) + .readAsync( + eq(String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, updatedClusterStateCustom3.getWriteableName())), + argThat(new BlobNameMatcher(clusterStateCustomFilename)), + any() + ); remoteClusterStateService.start(); remoteClusterStateService.setRemoteIndexMetadataManager(mockedIndexManager); @@ -1664,56 +1674,56 @@ public void testReadClusterStateInParallel_Success() throws IOException { uploadedClusterStateCustomMap.keySet().forEach(key -> assertTrue(updatedClusterState.customs().containsKey(key))); assertEquals(updatedClusterStateCustom3, updatedClusterState.custom("custom_3")); newIndicesToRead.forEach( - uploadedIndexMetadata -> verify(mockedIndexManager, times(1)).getAsyncIndexMetadataReadAction( - eq(previousClusterState.getMetadata().clusterUUID()), - eq(uploadedIndexMetadata.getUploadedFilename()), + uploadedIndexMetadata -> verify(mockedIndexManager, times(1)).readAsync( + eq("test-index-1"), + argThat(new BlobNameMatcher(uploadedIndexMetadata.getUploadedFilename())), any() ) ); - verify(mockedGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(COORDINATION_METADATA_FILENAME)), + verify(mockedGlobalMetadataManager, times(1)).readAsync( eq(COORDINATION_METADATA), + argThat(new BlobNameMatcher(COORDINATION_METADATA_FILENAME)), any() ); - verify(mockedGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(PERSISTENT_SETTINGS_FILENAME)), + verify(mockedGlobalMetadataManager, times(1)).readAsync( eq(SETTING_METADATA), + argThat(new BlobNameMatcher(PERSISTENT_SETTINGS_FILENAME)), any() ); - verify(mockedGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(TRANSIENT_SETTINGS_FILENAME)), + verify(mockedGlobalMetadataManager, times(1)).readAsync( eq(TRANSIENT_SETTING_METADATA), + argThat(new BlobNameMatcher(TRANSIENT_SETTINGS_FILENAME)), any() ); - verify(mockedGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(TEMPLATES_METADATA_FILENAME)), + verify(mockedGlobalMetadataManager, times(1)).readAsync( eq(TEMPLATES_METADATA), + argThat(new BlobNameMatcher(TEMPLATES_METADATA_FILENAME)), any() ); - verify(mockedGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(HASHES_OF_CONSISTENT_SETTINGS_FILENAME)), + verify(mockedGlobalMetadataManager, times(1)).readAsync( eq(HASHES_OF_CONSISTENT_SETTINGS), + argThat(new BlobNameMatcher(HASHES_OF_CONSISTENT_SETTINGS_FILENAME)), any() ); newCustomMetadataMap.keySet().forEach(uploadedCustomMetadataKey -> { - verify(mockedGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(newCustomMetadataMap.get(uploadedCustomMetadataKey).getUploadedFilename())), + verify(mockedGlobalMetadataManager, times(1)).readAsync( eq(uploadedCustomMetadataKey), + argThat(new BlobNameMatcher(newCustomMetadataMap.get(uploadedCustomMetadataKey).getUploadedFilename())), any() ); }); - verify(mockedClusterStateAttributeManager, times(1)).getAsyncMetadataReadAction( + verify(mockedClusterStateAttributeManager, times(1)).readAsync( eq(DISCOVERY_NODES), argThat(new BlobNameMatcher(DISCOVERY_NODES_FILENAME)), any() ); - verify(mockedClusterStateAttributeManager, times(1)).getAsyncMetadataReadAction( + verify(mockedClusterStateAttributeManager, times(1)).readAsync( eq(CLUSTER_BLOCKS), argThat(new BlobNameMatcher(CLUSTER_BLOCKS_FILENAME)), any() ); newClusterStateCustoms.keySet().forEach(uploadedClusterStateCustomMetadataKey -> { - verify(mockedClusterStateAttributeManager, times(1)).getAsyncMetadataReadAction( + verify(mockedClusterStateAttributeManager, times(1)).readAsync( eq(String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, uploadedClusterStateCustomMetadataKey)), argThat(new BlobNameMatcher(newClusterStateCustoms.get(uploadedClusterStateCustomMetadataKey).getUploadedFilename())), any() diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java index 917794ec03c3a..a2da1e8b0fdb2 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java @@ -158,7 +158,7 @@ public void testGlobalMetadataUploadWaitTimeSetting() { assertEquals(globalMetadataUploadTimeout, remoteGlobalMetadataManager.getGlobalMetadataUploadTimeout().seconds()); } - public void testGetReadMetadataAsyncAction_CoordinationMetadata() throws Exception { + public void testGetAsyncReadRunnable_CoordinationMetadata() throws Exception { CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); String fileName = randomAlphaOfLength(10); RemoteCoordinationMetadata coordinationMetadataForDownload = new RemoteCoordinationMetadata( @@ -173,11 +173,11 @@ public void testGetReadMetadataAsyncAction_CoordinationMetadata() throws Excepti TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - coordinationMetadataForDownload, + remoteGlobalMetadataManager.readAsync( COORDINATION_METADATA, + coordinationMetadataForDownload, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -186,7 +186,7 @@ public void testGetReadMetadataAsyncAction_CoordinationMetadata() throws Excepti assertEquals(COORDINATION_METADATA, listener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_CoordinationMetadata() throws Exception { + public void testGetAsyncWriteRunnable_CoordinationMetadata() throws Exception { CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); RemoteCoordinationMetadata remoteCoordinationMetadata = new RemoteCoordinationMetadata( coordinationMetadata, @@ -203,8 +203,11 @@ public void testGetAsyncMetadataWriteAction_CoordinationMetadata() throws Except TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction(remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch)) - .run(); + remoteGlobalMetadataManager.writeAsync( + COORDINATION_METADATA, + remoteCoordinationMetadata, + new LatchedActionListener<>(listener, latch) + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -224,7 +227,7 @@ public void testGetAsyncMetadataWriteAction_CoordinationMetadata() throws Except assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetReadMetadataAsyncAction_PersistentSettings() throws Exception { + public void testGetAsyncReadRunnable_PersistentSettings() throws Exception { Settings settingsMetadata = getSettings(); String fileName = randomAlphaOfLength(10); RemotePersistentSettingsMetadata persistentSettings = new RemotePersistentSettingsMetadata( @@ -240,11 +243,7 @@ public void testGetReadMetadataAsyncAction_PersistentSettings() throws Exception TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - persistentSettings, - SETTING_METADATA, - new LatchedActionListener<>(listener, latch) - ).run(); + remoteGlobalMetadataManager.readAsync(SETTING_METADATA, persistentSettings, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -253,7 +252,7 @@ public void testGetReadMetadataAsyncAction_PersistentSettings() throws Exception assertEquals(SETTING_METADATA, listener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_PersistentSettings() throws Exception { + public void testGetAsyncWriteRunnable_PersistentSettings() throws Exception { Settings settingsMetadata = getSettings(); RemotePersistentSettingsMetadata persistentSettings = new RemotePersistentSettingsMetadata( settingsMetadata, @@ -269,7 +268,7 @@ public void testGetAsyncMetadataWriteAction_PersistentSettings() throws Exceptio .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction(persistentSettings, new LatchedActionListener<>(listener, latch)).run(); + remoteGlobalMetadataManager.writeAsync(SETTING_METADATA, persistentSettings, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); @@ -290,7 +289,7 @@ public void testGetAsyncMetadataWriteAction_PersistentSettings() throws Exceptio assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetReadMetadataAsyncAction_TransientSettings() throws Exception { + public void testGetAsyncReadRunnable_TransientSettings() throws Exception { Settings settingsMetadata = getSettings(); String fileName = randomAlphaOfLength(10); RemoteTransientSettingsMetadata transientSettings = new RemoteTransientSettingsMetadata( @@ -306,11 +305,7 @@ public void testGetReadMetadataAsyncAction_TransientSettings() throws Exception TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - transientSettings, - TRANSIENT_SETTING_METADATA, - new LatchedActionListener<>(listener, latch) - ).run(); + remoteGlobalMetadataManager.readAsync(TRANSIENT_SETTING_METADATA, transientSettings, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -319,7 +314,7 @@ public void testGetReadMetadataAsyncAction_TransientSettings() throws Exception assertEquals(TRANSIENT_SETTING_METADATA, listener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_TransientSettings() throws Exception { + public void testGetAsyncWriteRunnable_TransientSettings() throws Exception { Settings settingsMetadata = getSettings(); RemoteTransientSettingsMetadata transientSettings = new RemoteTransientSettingsMetadata( settingsMetadata, @@ -335,7 +330,7 @@ public void testGetAsyncMetadataWriteAction_TransientSettings() throws Exception .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction(transientSettings, new LatchedActionListener<>(listener, latch)).run(); + remoteGlobalMetadataManager.writeAsync(TRANSIENT_SETTING_METADATA, transientSettings, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -355,7 +350,7 @@ public void testGetAsyncMetadataWriteAction_TransientSettings() throws Exception assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetReadMetadataAsyncAction_HashesOfConsistentSettings() throws Exception { + public void testGetAsyncReadRunnable_HashesOfConsistentSettings() throws Exception { DiffableStringMap hashesOfConsistentSettings = getHashesOfConsistentSettings(); String fileName = randomAlphaOfLength(10); RemoteHashesOfConsistentSettings hashesOfConsistentSettingsForDownload = new RemoteHashesOfConsistentSettings( @@ -369,11 +364,11 @@ public void testGetReadMetadataAsyncAction_HashesOfConsistentSettings() throws E TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - hashesOfConsistentSettingsForDownload, + remoteGlobalMetadataManager.readAsync( HASHES_OF_CONSISTENT_SETTINGS, + hashesOfConsistentSettingsForDownload, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -382,7 +377,7 @@ public void testGetReadMetadataAsyncAction_HashesOfConsistentSettings() throws E assertEquals(HASHES_OF_CONSISTENT_SETTINGS, listener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_HashesOfConsistentSettings() throws Exception { + public void testGetAsyncWriteRunnable_HashesOfConsistentSettings() throws Exception { DiffableStringMap hashesOfConsistentSettings = getHashesOfConsistentSettings(); RemoteHashesOfConsistentSettings hashesOfConsistentSettingsForUpload = new RemoteHashesOfConsistentSettings( hashesOfConsistentSettings, @@ -397,10 +392,11 @@ public void testGetAsyncMetadataWriteAction_HashesOfConsistentSettings() throws .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction( + remoteGlobalMetadataManager.writeAsync( + HASHES_OF_CONSISTENT_SETTINGS, hashesOfConsistentSettingsForUpload, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -420,7 +416,7 @@ public void testGetAsyncMetadataWriteAction_HashesOfConsistentSettings() throws assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetReadMetadataAsyncAction_TemplatesMetadata() throws Exception { + public void testGetAsyncReadRunnable_TemplatesMetadata() throws Exception { TemplatesMetadata templatesMetadata = getTemplatesMetadata(); String fileName = randomAlphaOfLength(10); RemoteTemplatesMetadata templatesMetadataForDownload = new RemoteTemplatesMetadata( @@ -434,11 +430,11 @@ public void testGetReadMetadataAsyncAction_TemplatesMetadata() throws Exception ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - templatesMetadataForDownload, + remoteGlobalMetadataManager.readAsync( TEMPLATES_METADATA, + templatesMetadataForDownload, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -447,7 +443,7 @@ public void testGetReadMetadataAsyncAction_TemplatesMetadata() throws Exception assertEquals(TEMPLATES_METADATA, listener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_TemplatesMetadata() throws Exception { + public void testGetAsyncWriteRunnable_TemplatesMetadata() throws Exception { TemplatesMetadata templatesMetadata = getTemplatesMetadata(); RemoteTemplatesMetadata templateMetadataForUpload = new RemoteTemplatesMetadata( templatesMetadata, @@ -463,8 +459,7 @@ public void testGetAsyncMetadataWriteAction_TemplatesMetadata() throws Exception .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction(templateMetadataForUpload, new LatchedActionListener<>(listener, latch)) - .run(); + remoteGlobalMetadataManager.writeAsync(TEMPLATES_METADATA, templateMetadataForUpload, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -484,7 +479,7 @@ public void testGetAsyncMetadataWriteAction_TemplatesMetadata() throws Exception assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetReadMetadataAsyncAction_CustomMetadata() throws Exception { + public void testGetAsyncReadRunnable_CustomMetadata() throws Exception { Metadata.Custom customMetadata = getCustomMetadata(); String fileName = randomAlphaOfLength(10); RemoteCustomMetadata customMetadataForDownload = new RemoteCustomMetadata( @@ -499,11 +494,7 @@ public void testGetReadMetadataAsyncAction_CustomMetadata() throws Exception { ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - customMetadataForDownload, - IndexGraveyard.TYPE, - new LatchedActionListener<>(listener, latch) - ).run(); + remoteGlobalMetadataManager.readAsync(IndexGraveyard.TYPE, customMetadataForDownload, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -512,7 +503,7 @@ public void testGetReadMetadataAsyncAction_CustomMetadata() throws Exception { assertEquals(IndexGraveyard.TYPE, listener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_CustomMetadata() throws Exception { + public void testGetAsyncWriteRunnable_CustomMetadata() throws Exception { Metadata.Custom customMetadata = getCustomMetadata(); RemoteCustomMetadata customMetadataForUpload = new RemoteCustomMetadata( customMetadata, @@ -529,8 +520,11 @@ public void testGetAsyncMetadataWriteAction_CustomMetadata() throws Exception { .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction(customMetadataForUpload, new LatchedActionListener<>(listener, latch)) - .run(); + remoteGlobalMetadataManager.writeAsync( + customMetadataForUpload.getType(), + customMetadataForUpload, + new LatchedActionListener<>(listener, latch) + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -550,7 +544,7 @@ public void testGetAsyncMetadataWriteAction_CustomMetadata() throws Exception { assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetReadMetadataAsyncAction_GlobalMetadata() throws Exception { + public void testGetAsyncReadRunnable_GlobalMetadata() throws Exception { Metadata metadata = getGlobalMetadata(); String fileName = randomAlphaOfLength(10); RemoteGlobalMetadata globalMetadataForDownload = new RemoteGlobalMetadata(fileName, CLUSTER_UUID, compressor, xContentRegistry); @@ -559,11 +553,7 @@ public void testGetReadMetadataAsyncAction_GlobalMetadata() throws Exception { ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - globalMetadataForDownload, - GLOBAL_METADATA, - new LatchedActionListener<>(listener, latch) - ).run(); + remoteGlobalMetadataManager.readAsync(GLOBAL_METADATA, globalMetadataForDownload, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -572,7 +562,7 @@ public void testGetReadMetadataAsyncAction_GlobalMetadata() throws Exception { assertEquals(GLOBAL_METADATA, listener.getResult().getComponentName()); } - public void testGetReadMetadataAsyncAction_IOException() throws Exception { + public void testGetAsyncReadRunnable_IOException() throws Exception { String fileName = randomAlphaOfLength(10); RemoteCoordinationMetadata coordinationMetadataForDownload = new RemoteCoordinationMetadata( fileName, @@ -584,18 +574,19 @@ public void testGetReadMetadataAsyncAction_IOException() throws Exception { when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenThrow(ioException); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - coordinationMetadataForDownload, + remoteGlobalMetadataManager.readAsync( COORDINATION_METADATA, + coordinationMetadataForDownload, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getResult()); assertNotNull(listener.getFailure()); - assertEquals(ioException, listener.getFailure()); + assertEquals(ioException, listener.getFailure().getCause()); + assertTrue(listener.getFailure() instanceof RemoteStateTransferException); } - public void testGetAsyncMetadataWriteAction_IOException() throws Exception { + public void testGetAsyncWriteRunnable_IOException() throws Exception { CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); RemoteCoordinationMetadata remoteCoordinationMetadata = new RemoteCoordinationMetadata( coordinationMetadata, @@ -613,8 +604,11 @@ public void testGetAsyncMetadataWriteAction_IOException() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction(remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch)) - .run(); + remoteGlobalMetadataManager.writeAsync( + COORDINATION_METADATA, + remoteCoordinationMetadata, + new LatchedActionListener<>(listener, latch) + ); assertNull(listener.getResult()); assertNotNull(listener.getFailure()); assertTrue(listener.getFailure() instanceof RemoteStateTransferException); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java index 817fc7b55d09a..76c5792677ea0 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java @@ -24,6 +24,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.compress.Compressor; import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.gateway.remote.model.RemoteIndexMetadata; import org.opensearch.gateway.remote.model.RemoteReadResult; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; @@ -83,7 +84,7 @@ public void tearDown() throws Exception { threadPool.shutdown(); } - public void testGetAsyncIndexMetadataWriteAction_Success() throws Exception { + public void testGetAsyncWriteRunnable_Success() throws Exception { IndexMetadata indexMetadata = getIndexMetadata(randomAlphaOfLength(10), randomBoolean(), randomAlphaOfLength(10)); BlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); BlobStore blobStore = mock(BlobStore.class); @@ -97,11 +98,11 @@ public void testGetAsyncIndexMetadataWriteAction_Success() throws Exception { return null; })).when(blobStoreTransferService).uploadBlob(any(), any(), any(), eq(WritePriority.URGENT), any(ActionListener.class)); - remoteIndexMetadataManager.getAsyncIndexMetadataWriteAction( - indexMetadata, - "cluster-uuid", + remoteIndexMetadataManager.writeAsync( + INDEX, + new RemoteIndexMetadata(indexMetadata, "cluster-uuid", compressor, null), new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); @@ -116,7 +117,7 @@ public void testGetAsyncIndexMetadataWriteAction_Success() throws Exception { assertTrue(pathTokens[6].startsWith(expectedFilePrefix)); } - public void testGetAsyncIndexMetadataWriteAction_IOFailure() throws Exception { + public void testGetAsyncWriteRunnable_IOFailure() throws Exception { IndexMetadata indexMetadata = getIndexMetadata(randomAlphaOfLength(10), randomBoolean(), randomAlphaOfLength(10)); BlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); BlobStore blobStore = mock(BlobStore.class); @@ -129,18 +130,18 @@ public void testGetAsyncIndexMetadataWriteAction_IOFailure() throws Exception { return null; })).when(blobStoreTransferService).uploadBlob(any(), any(), any(), eq(WritePriority.URGENT), any(ActionListener.class)); - remoteIndexMetadataManager.getAsyncIndexMetadataWriteAction( - indexMetadata, - "cluster-uuid", + remoteIndexMetadataManager.writeAsync( + INDEX, + new RemoteIndexMetadata(indexMetadata, "cluster-uuid", compressor, null), new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getResult()); assertNotNull(listener.getFailure()); assertTrue(listener.getFailure() instanceof RemoteStateTransferException); } - public void testGetAsyncIndexMetadataReadAction_Success() throws Exception { + public void testGetAsyncReadRunnable_Success() throws Exception { IndexMetadata indexMetadata = getIndexMetadata(randomAlphaOfLength(10), randomBoolean(), randomAlphaOfLength(10)); String fileName = randomAlphaOfLength(10); fileName = fileName + DELIMITER + '2'; @@ -150,15 +151,18 @@ public void testGetAsyncIndexMetadataReadAction_Success() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteIndexMetadataManager.getAsyncIndexMetadataReadAction("cluster-uuid", fileName, new LatchedActionListener<>(listener, latch)) - .run(); + remoteIndexMetadataManager.readAsync( + INDEX, + new RemoteIndexMetadata(fileName, "cluster-uuid", compressor, null), + new LatchedActionListener<>(listener, latch) + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); assertEquals(indexMetadata, listener.getResult().getObj()); } - public void testGetAsyncIndexMetadataReadAction_IOFailure() throws Exception { + public void testGetAsyncReadRunnable_IOFailure() throws Exception { String fileName = randomAlphaOfLength(10); fileName = fileName + DELIMITER + '2'; Exception exception = new IOException("testing failure"); @@ -166,12 +170,16 @@ public void testGetAsyncIndexMetadataReadAction_IOFailure() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteIndexMetadataManager.getAsyncIndexMetadataReadAction("cluster-uuid", fileName, new LatchedActionListener<>(listener, latch)) - .run(); + remoteIndexMetadataManager.readAsync( + INDEX, + new RemoteIndexMetadata(fileName, "cluster-uuid", compressor, null), + new LatchedActionListener<>(listener, latch) + ); latch.await(); assertNull(listener.getResult()); assertNotNull(listener.getFailure()); - assertEquals(exception, listener.getFailure()); + assertEquals(exception, listener.getFailure().getCause()); + assertTrue(listener.getFailure() instanceof RemoteStateTransferException); } private IndexMetadata getIndexMetadata(String name, @Nullable Boolean writeIndex, String... aliases) {