From 2b0d7140f8e7c1a5a9c08ef1f03005845acadd7c Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 16 Apr 2024 15:55:26 +0530 Subject: [PATCH] Incoporate PR review feedback Signed-off-by: Ashish Singh --- ...reationPreIndexMetadataUploadListener.java | 46 -------- .../IndexMetadataUploadInterceptor.java | 34 ++++++ .../remote/RemoteClusterStateService.java | 44 ++++++-- ...teUploadPathIndexCreationInterceptor.java} | 100 ++++++++++++------ .../main/java/org/opensearch/node/Node.java | 18 ++-- .../blobstore/ChecksumBlobStoreFormat.java | 92 ++-------------- .../GatewayMetaStatePersistedStateTests.java | 7 +- .../RemoteClusterStateServiceTests.java | 9 +- 8 files changed, 165 insertions(+), 185 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/gateway/remote/IndexCreationPreIndexMetadataUploadListener.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadInterceptor.java rename server/src/main/java/org/opensearch/index/remote/{RemoteUploadPathIndexCreationListener.java => RemoteUploadPathIndexCreationInterceptor.java} (72%) diff --git a/server/src/main/java/org/opensearch/gateway/remote/IndexCreationPreIndexMetadataUploadListener.java b/server/src/main/java/org/opensearch/gateway/remote/IndexCreationPreIndexMetadataUploadListener.java deleted file mode 100644 index 8b8c0ca137079..0000000000000 --- a/server/src/main/java/org/opensearch/gateway/remote/IndexCreationPreIndexMetadataUploadListener.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway.remote; - -import org.opensearch.cluster.metadata.IndexMetadata; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CountDownLatch; - -/** - * Hook for running code that needs to be executed before the upload of index metadata during index creation or - * after enabling the remote cluster statement for the first time. The listener is intended to be run in parallel and - * async with the index metadata upload. - * - * @opensearch.internal - */ -public interface IndexCreationPreIndexMetadataUploadListener { - - /** - * This returns the additional count that needs to be added in the latch present in {@link RemoteClusterStateService} - * which is used to achieve parallelism and async nature of uploads for index metadata upload. The same latch is used - * for running pre index metadata upload listener. - * - * @param newIndexMetadataList list of index metadata of new indexes (or first time index metadata upload). - * @return latch count to be added by the caller. - */ - int latchCount(List newIndexMetadataList); - - /** - * This will run the pre index metadata upload listener using the {@code newIndexMetadataList}, {@code latch} and - * {@code exceptionList}. This method must execute the operation in parallel and async to ensure that the cluster state - * upload time remains the same. - * - * @param newIndexMetadataList list of index metadata of new indexes (or first time index metadata upload). - * @param latch this is used for counting down once the unit of work per index is done. - * @param exceptionList exception if any during run will be added here and used by the caller. - */ - void run(List newIndexMetadataList, CountDownLatch latch, List exceptionList) throws IOException; -} diff --git a/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadInterceptor.java b/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadInterceptor.java new file mode 100644 index 0000000000000..ebb237f670c0e --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadInterceptor.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.core.action.ActionListener; + +import java.io.IOException; +import java.util.List; + +/** + * Hook for running code that needs to be executed before the upload of index metadata during index creation or + * after enabling the remote cluster statement for the first time. The listener is intended to be run in parallel and + * async with the index metadata upload. + * + * @opensearch.internal + */ +public interface IndexMetadataUploadInterceptor { + + /** + * Intercepts the index metadata upload flow with input containing index metadata of new indexes (or first time upload). + * The caller is expected to trigger onSuccess or onFailure of the {@code ActionListener}. + * + * @param indexMetadataList list of index metadata of new indexes (or first time index metadata upload). + * @param actionListener listener to be invoked on success or failure. + */ + void interceptIndexCreation(List indexMetadataList, ActionListener actionListener) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index c7790ab965ca1..e271fd86720b3 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -27,6 +27,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; @@ -160,7 +161,7 @@ public class RemoteClusterStateService implements Closeable { private final Settings settings; private final LongSupplier relativeTimeNanosSupplier; private final ThreadPool threadpool; - private final IndexCreationPreIndexMetadataUploadListener indexCreationListener; + private final List indexMetadataUploadInterceptors; private BlobStoreRepository blobStoreRepository; private BlobStoreTransferService blobStoreTransferService; private volatile TimeValue slowWriteLoggingThreshold; @@ -191,7 +192,7 @@ public RemoteClusterStateService( ClusterSettings clusterSettings, LongSupplier relativeTimeNanosSupplier, ThreadPool threadPool, - IndexCreationPreIndexMetadataUploadListener indexCreationListener + List indexMetadataUploadInterceptors ) { assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled"; this.nodeId = nodeId; @@ -208,7 +209,7 @@ public RemoteClusterStateService( clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout); this.remoteStateStats = new RemotePersistenceStats(); - this.indexCreationListener = indexCreationListener; + this.indexMetadataUploadInterceptors = indexMetadataUploadInterceptors; } private BlobStoreTransferService getBlobStoreTransferService() { @@ -452,13 +453,19 @@ private List writeIndexMetadataParallel( List toUpload, List newIndexMetadataList ) throws IOException { - assert Objects.nonNull(indexCreationListener) : "indexCreationListener can not be null"; - int latchCount = toUpload.size() + indexCreationListener.latchCount(newIndexMetadataList); + assert CollectionUtils.isEmpty(indexMetadataUploadInterceptors) == false : "indexMetadataUploadInterceptors can not be empty"; + int latchCount = toUpload.size() + indexMetadataUploadInterceptors.size(); List exceptionList = Collections.synchronizedList(new ArrayList<>(latchCount)); final CountDownLatch latch = new CountDownLatch(latchCount); List result = new ArrayList<>(toUpload.size()); uploadIndexMetadataAsync(clusterState, result, toUpload, latch, exceptionList); - indexCreationListener.run(newIndexMetadataList, latch, exceptionList); + + for (IndexMetadataUploadInterceptor interceptor : indexMetadataUploadInterceptors) { + interceptor.interceptIndexCreation( + newIndexMetadataList, + getIndexMetadataUploadInterceptorListener(newIndexMetadataList, latch, exceptionList, interceptor.getClass()) + ); + } try { if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { @@ -499,6 +506,29 @@ private List writeIndexMetadataParallel( return result; } + private ActionListener getIndexMetadataUploadInterceptorListener( + List newIndexMetadataList, + CountDownLatch latch, + List exceptionList, + Class clazz + ) { + return new LatchedActionListener<>( + ActionListener.wrap( + ignored -> logger.trace( + new ParameterizedMessage("{} : Intercepted {} successfully", clazz.getSimpleName(), newIndexMetadataList) + ), + ex -> { + logger.error( + new ParameterizedMessage("{} : Exception during interception of {}", clazz.getSimpleName(), newIndexMetadataList), + ex + ); + exceptionList.add(ex); + } + ), + latch + ); + } + private void uploadIndexMetadataAsync( ClusterState clusterState, List result, @@ -1177,7 +1207,7 @@ public void writeMetadataFailed() { /** * Exception for Remote state transfer. */ - static class RemoteStateTransferException extends RuntimeException { + public static class RemoteStateTransferException extends RuntimeException { public RemoteStateTransferException(String errorDesc) { super(errorDesc); diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationListener.java b/server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java similarity index 72% rename from server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationListener.java rename to server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java index 29868b1e0ecc5..8b94a3216d0b9 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationListener.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java @@ -15,11 +15,14 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; -import org.opensearch.common.xcontent.XContentType; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; -import org.opensearch.gateway.remote.IndexCreationPreIndexMetadataUploadListener; +import org.opensearch.core.index.Index; +import org.opensearch.gateway.remote.IndexMetadataUploadInterceptor; import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.RemoteClusterStateService.RemoteStateTransferException; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.repositories.RepositoriesService; @@ -28,14 +31,19 @@ import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING; import static org.opensearch.index.remote.RemoteIndexPath.SEGMENT_PATH; import static org.opensearch.index.remote.RemoteIndexPath.TRANSLOG_PATH; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent; @@ -45,7 +53,7 @@ * Uploads the remote store path for all possible combinations of {@link org.opensearch.index.remote.RemoteStoreEnums.DataCategory} * and {@link org.opensearch.index.remote.RemoteStoreEnums.DataType} for each shard of an index. */ -public class RemoteUploadPathIndexCreationListener implements IndexCreationPreIndexMetadataUploadListener { +public class RemoteUploadPathIndexCreationInterceptor implements IndexMetadataUploadInterceptor { public static final ChecksumBlobStoreFormat REMOTE_INDEX_PATH_FORMAT = new ChecksumBlobStoreFormat<>( "remote-index-path", @@ -53,51 +61,75 @@ public class RemoteUploadPathIndexCreationListener implements IndexCreationPreIn RemoteIndexPath::fromXContent ); - private static final Logger logger = LogManager.getLogger(RemoteUploadPathIndexCreationListener.class); + private static final String TIMEOUT_EXCEPTION_MSG = "Timed out waiting while uploading remote index path file for indexes=%s"; + private static final String UPLOAD_EXCEPTION_MSG = "Exception occurred while uploading remote index paths for indexes=%s"; + + private static final Logger logger = LogManager.getLogger(RemoteUploadPathIndexCreationInterceptor.class); private final Settings settings; private final boolean isRemoteDataAttributePresent; private final boolean isTranslogSegmentRepoSame; private final Supplier repositoriesService; + private volatile TimeValue indexMetadataUploadTimeout; private BlobStoreRepository translogRepository; private BlobStoreRepository segmentRepository; - public RemoteUploadPathIndexCreationListener(Settings settings, Supplier repositoriesService) { + public RemoteUploadPathIndexCreationInterceptor( + Settings settings, + Supplier repositoriesService, + ClusterSettings clusterSettings + ) { this.settings = settings; this.repositoriesService = repositoriesService; isRemoteDataAttributePresent = isRemoteDataAttributePresent(settings); // If the remote data attributes are not present, then there is no effect of translog and segment being same or different or null. isTranslogSegmentRepoSame = isTranslogSegmentRepoSame(); + indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING); + clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout); } @Override - public int latchCount(List newIndexMetadataList) { + public void interceptIndexCreation(List indexMetadataList, ActionListener actionListener) throws IOException { if (isRemoteDataAttributePresent == false) { - return 0; + actionListener.onResponse(null); + return; } - int eligibleIndexCount = (int) newIndexMetadataList.stream().filter(this::uploadIndexPathFile).count(); - return isTranslogSegmentRepoSame ? eligibleIndexCount : 2 * eligibleIndexCount; - } - @Override - public void run(List newIndexMetadataList, CountDownLatch latch, List exceptionList) throws IOException { - if (isRemoteDataAttributePresent == false) { - return; + List eligibleList = indexMetadataList.stream().filter(this::requiresPathUpload).collect(Collectors.toList()); + int latchCount = eligibleList.size() * (isTranslogSegmentRepoSame ? 1 : 2); + CountDownLatch latch = new CountDownLatch(latchCount); + List exceptionList = Collections.synchronizedList(new ArrayList<>(latchCount)); + for (IndexMetadata indexMetadata : eligibleList) { + writeIndexPathAsync(indexMetadata, latch, exceptionList); } - List elibibleIndexMetadaList = newIndexMetadataList.stream() - .filter(this::uploadIndexPathFile) - .collect(Collectors.toList()); - if (isTranslogSegmentRepoSame) { - assert latchCount(newIndexMetadataList) == elibibleIndexMetadaList.size() - : "Latch count is not equal to elibibleIndexMetadaList's size for path upload"; - } else { - assert latchCount(newIndexMetadataList) == 2 * elibibleIndexMetadaList.size() - : "Latch count is not equal to (2 * elibibleIndexMetadaList's size) for path upload"; + String indexNames = eligibleList.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining(",")); + + try { + if (latch.await(indexMetadataUploadTimeout.millis(), TimeUnit.MILLISECONDS) == false) { + RemoteStateTransferException ex = new RemoteStateTransferException( + String.format(Locale.ROOT, TIMEOUT_EXCEPTION_MSG, indexNames) + ); + exceptionList.forEach(ex::addSuppressed); + actionListener.onFailure(ex); + return; + } + } catch (InterruptedException exception) { + exceptionList.forEach(exception::addSuppressed); + RemoteStateTransferException ex = new RemoteStateTransferException( + String.format(Locale.ROOT, TIMEOUT_EXCEPTION_MSG, indexNames), + exception + ); + actionListener.onFailure(ex); } - for (IndexMetadata indexMetadata : elibibleIndexMetadaList) { - writeIndexPathAsync(indexMetadata, latch, exceptionList); + if (exceptionList.size() > 0) { + RemoteStateTransferException ex = new RemoteStateTransferException( + String.format(Locale.ROOT, UPLOAD_EXCEPTION_MSG, indexNames) + ); + exceptionList.forEach(ex::addSuppressed); + actionListener.onFailure(ex); } + actionListener.onResponse(null); } private void writeIndexPathAsync(IndexMetadata idxMD, CountDownLatch latch, List exceptionList) throws IOException { @@ -122,9 +154,7 @@ private void writeIndexPathAsync(IndexMetadata idxMD, CountDownLatch latch, List indexUUID, translogRepository.getCompressor(), getUploadPathLatchedActionListener(idxMD, latch, exceptionList, pathCreationMap), - RemoteClusterStateService.FORMAT_PARAMS, - true, - XContentType.JSON + RemoteClusterStateService.FORMAT_PARAMS ); } else { // If the repositories are different, then we need to upload one file per segment and translog containing their individual @@ -135,9 +165,7 @@ private void writeIndexPathAsync(IndexMetadata idxMD, CountDownLatch latch, List indexUUID, translogRepository.getCompressor(), getUploadPathLatchedActionListener(idxMD, latch, exceptionList, TRANSLOG_PATH), - RemoteClusterStateService.FORMAT_PARAMS, - true, - XContentType.JSON + RemoteClusterStateService.FORMAT_PARAMS ); BlobPath segmentBasePath = segmentRepository.basePath(); @@ -148,9 +176,7 @@ private void writeIndexPathAsync(IndexMetadata idxMD, CountDownLatch latch, List indexUUID, segmentRepository.getCompressor(), getUploadPathLatchedActionListener(idxMD, latch, exceptionList, SEGMENT_PATH), - RemoteClusterStateService.FORMAT_PARAMS, - true, - XContentType.JSON + RemoteClusterStateService.FORMAT_PARAMS ); } } @@ -218,7 +244,7 @@ private LatchedActionListener getUploadPathLatchedActionListener( * This method checks if the index metadata has attributes that calls for uploading the index path for remote store * uploads. It checks if the remote store path type is {@code HASHED_PREFIX} and returns true if so. */ - private boolean uploadIndexPathFile(IndexMetadata indexMetadata) { + private boolean requiresPathUpload(IndexMetadata indexMetadata) { // A cluster will have remote custom metadata only if the cluster is remote store enabled from data side. Map remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); if (Objects.isNull(remoteCustomData) || remoteCustomData.isEmpty()) { @@ -231,4 +257,8 @@ private boolean uploadIndexPathFile(IndexMetadata indexMetadata) { // We need to upload the path only if the path type for an index is hashed_prefix return RemoteStoreEnums.PathType.HASHED_PREFIX == RemoteStoreEnums.PathType.parseString(pathTypeStr); } + + private void setIndexMetadataUploadTimeout(TimeValue newIndexMetadataUploadTimeout) { + this.indexMetadataUploadTimeout = newIndexMetadataUploadTimeout; + } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 028c495523591..5e0094e438c63 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -147,7 +147,7 @@ import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.recovery.RemoteStoreRestoreService; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; -import org.opensearch.index.remote.RemoteUploadPathIndexCreationListener; +import org.opensearch.index.remote.RemoteUploadPathIndexCreationInterceptor; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCacheCleaner; @@ -727,9 +727,13 @@ protected Node( threadPool::relativeTimeInMillis ); final RemoteClusterStateService remoteClusterStateService; - final RemoteUploadPathIndexCreationListener indexCreationListener; + final RemoteUploadPathIndexCreationInterceptor indexCreationListener; if (isRemoteStoreClusterStateEnabled(settings)) { - indexCreationListener = new RemoteUploadPathIndexCreationListener(settings, repositoriesServiceReference::get); + indexCreationListener = new RemoteUploadPathIndexCreationInterceptor( + settings, + repositoriesServiceReference::get, + clusterService.getClusterSettings() + ); remoteClusterStateService = new RemoteClusterStateService( nodeEnvironment.nodeId(), repositoriesServiceReference::get, @@ -737,7 +741,7 @@ protected Node( clusterService.getClusterSettings(), threadPool::preciseRelativeTimeInNanos, threadPool, - indexCreationListener + List.of(indexCreationListener) ); } else { remoteClusterStateService = null; @@ -1317,7 +1321,7 @@ protected Node( b.bind(SearchRequestSlowLog.class).toInstance(searchRequestSlowLog); b.bind(MetricsRegistry.class).toInstance(metricsRegistry); b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); - b.bind(RemoteUploadPathIndexCreationListener.class).toProvider(() -> indexCreationListener); + b.bind(RemoteUploadPathIndexCreationInterceptor.class).toProvider(() -> indexCreationListener); b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker); b.bind(SearchRequestOperationsCompositeListenerFactory.class).toInstance(searchRequestOperationsCompositeListenerFactory); @@ -1467,8 +1471,8 @@ public Node start() throws NodeValidationException { if (remoteClusterStateService != null) { remoteClusterStateService.start(); } - final RemoteUploadPathIndexCreationListener indexCreationListener = injector.getInstance( - RemoteUploadPathIndexCreationListener.class + final RemoteUploadPathIndexCreationInterceptor indexCreationListener = injector.getInstance( + RemoteUploadPathIndexCreationInterceptor.class ); if (indexCreationListener != null) { indexCreationListener.start(); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 1be096dd92577..3e6052a5ef820 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -57,7 +57,6 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.compress.Compressor; -import org.opensearch.core.xcontent.MediaType; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; @@ -191,32 +190,9 @@ public void write( final String name, final Compressor compressor, final ToXContent.Params params - ) throws IOException { - write(obj, blobContainer, name, compressor, params, false, XContentType.SMILE); - } - - /** - * Writes blob with resolving the blob name using {@link #blobName} method. - *

- * The blob will optionally by compressed. - * - * @param obj object to be serialized - * @param blobContainer blob container - * @param name blob name - * @param compressor whether to use compression - * @param params ToXContent params - */ - public void write( - final T obj, - final BlobContainer blobContainer, - final String name, - final Compressor compressor, - final ToXContent.Params params, - boolean skipHeaderFooter, - MediaType mediaType ) throws IOException { final String blobName = blobName(name); - final BytesReference bytes = serialize(obj, blobName, compressor, params, skipHeaderFooter, mediaType); + final BytesReference bytes = serialize(obj, blobName, compressor, params); blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(), false); } @@ -232,17 +208,7 @@ public void writeAsync( final ToXContent.Params params ) throws IOException { // use NORMAL priority by default - this.writeAsyncWithPriority( - obj, - blobContainer, - name, - compressor, - WritePriority.NORMAL, - listener, - params, - false, - XContentType.SMILE - ); + this.writeAsyncWithPriority(obj, blobContainer, name, compressor, WritePriority.NORMAL, listener, params); } /** @@ -260,30 +226,7 @@ public void writeAsyncWithUrgentPriority( ActionListener listener, final ToXContent.Params params ) throws IOException { - this.writeAsyncWithPriority( - obj, - blobContainer, - name, - compressor, - WritePriority.URGENT, - listener, - params, - false, - XContentType.SMILE - ); - } - - public void writeAsyncWithUrgentPriority( - final T obj, - final BlobContainer blobContainer, - final String name, - final Compressor compressor, - ActionListener listener, - final ToXContent.Params params, - boolean skipHeaderFooter, - MediaType type - ) throws IOException { - this.writeAsyncWithPriority(obj, blobContainer, name, compressor, WritePriority.URGENT, listener, params, skipHeaderFooter, type); + this.writeAsyncWithPriority(obj, blobContainer, name, compressor, WritePriority.URGENT, listener, params); } /** @@ -305,17 +248,15 @@ private void writeAsyncWithPriority( final Compressor compressor, final WritePriority priority, ActionListener listener, - final ToXContent.Params params, - boolean skipHeaderFooter, - MediaType mediaType + final ToXContent.Params params ) throws IOException { if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) { - write(obj, blobContainer, name, compressor, params, skipHeaderFooter, mediaType); + write(obj, blobContainer, name, compressor, params); listener.onResponse(null); return; } final String blobName = blobName(name); - final BytesReference bytes = serialize(obj, blobName, compressor, params, skipHeaderFooter, mediaType); + final BytesReference bytes = serialize(obj, blobName, compressor, params); final String resourceDescription = "ChecksumBlobStoreFormat.writeAsyncWithPriority(blob=\"" + blobName + "\")"; try (IndexInput input = new ByteArrayIndexInput(resourceDescription, BytesReference.toBytes(bytes))) { long expectedChecksum; @@ -349,17 +290,6 @@ private void writeAsyncWithPriority( public BytesReference serialize(final T obj, final String blobName, final Compressor compressor, final ToXContent.Params params) throws IOException { - return serialize(obj, blobName, compressor, params, false, XContentType.SMILE); - } - - public BytesReference serialize( - final T obj, - final String blobName, - final Compressor compressor, - final ToXContent.Params params, - boolean skipHeaderFooter, - MediaType type - ) throws IOException { try (BytesStreamOutput outputStream = new BytesStreamOutput()) { try ( OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( @@ -369,9 +299,7 @@ public BytesReference serialize( BUFFER_SIZE ) ) { - if (skipHeaderFooter == false) { - CodecUtil.writeHeader(indexOutput, codec, VERSION); - } + CodecUtil.writeHeader(indexOutput, codec, VERSION); try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) { @Override public void close() throws IOException { @@ -380,7 +308,7 @@ public void close() throws IOException { } }; XContentBuilder builder = MediaTypeRegistry.contentBuilder( - type, + XContentType.SMILE, compressor.threadLocalOutputStream(indexOutputOutputStream) ) ) { @@ -388,9 +316,7 @@ public void close() throws IOException { obj.toXContent(builder, params); builder.endObject(); } - if (skipHeaderFooter == false) { - CodecUtil.writeFooter(indexOutput); - } + CodecUtil.writeFooter(indexOutput); } return outputStream.bytes(); } diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 6a831f636df6f..ada483a6c576c 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -71,7 +71,7 @@ import org.opensearch.gateway.remote.RemotePersistenceStats; import org.opensearch.index.recovery.RemoteStoreRestoreService; import org.opensearch.index.recovery.RemoteStoreRestoreService.RemoteRestoreResult; -import org.opensearch.index.remote.RemoteUploadPathIndexCreationListener; +import org.opensearch.index.remote.RemoteUploadPathIndexCreationInterceptor; import org.opensearch.node.Node; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.fs.FsRepository; @@ -482,14 +482,15 @@ public void testDataOnlyNodePersistence() throws Exception { Collections.emptyMap(), transportService.getThreadPool() ); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); return new RemoteClusterStateService( nodeEnvironment.nodeId(), repositoriesServiceSupplier, settings, - new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + clusterSettings, () -> 0L, threadPool, - new RemoteUploadPathIndexCreationListener(settings, repositoriesServiceSupplier) + List.of(new RemoteUploadPathIndexCreationInterceptor(settings, repositoriesServiceSupplier, clusterSettings)) ); } else { return null; diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index e760b915ed260..ae42a446c84a6 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -39,7 +39,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.index.remote.RemoteUploadPathIndexCreationListener; +import org.opensearch.index.remote.RemoteUploadPathIndexCreationInterceptor; import org.opensearch.indices.IndicesModule; import org.opensearch.repositories.FilterRepository; import org.opensearch.repositories.RepositoriesService; @@ -156,7 +156,7 @@ public void setup() { clusterSettings, () -> 0L, threadPool, - new RemoteUploadPathIndexCreationListener(settings, repositoriesServiceSupplier) + List.of(new RemoteUploadPathIndexCreationInterceptor(settings, repositoriesServiceSupplier, clusterSettings)) ); } @@ -175,16 +175,17 @@ public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException public void testFailInitializationWhenRemoteStateDisabled() { final Settings settings = Settings.builder().build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); assertThrows( AssertionError.class, () -> new RemoteClusterStateService( "test-node-id", repositoriesServiceSupplier, settings, - new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + clusterSettings, () -> 0L, threadPool, - new RemoteUploadPathIndexCreationListener(settings, repositoriesServiceSupplier) + List.of(new RemoteUploadPathIndexCreationInterceptor(settings, repositoriesServiceSupplier, clusterSettings)) ) ); }