Skip to content

Commit

Permalink
Incoporate PR review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Apr 16, 2024
1 parent 723ead9 commit 2b0d714
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 185 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.core.action.ActionListener;

import java.io.IOException;
import java.util.List;

/**
* Hook for running code that needs to be executed before the upload of index metadata during index creation or
* after enabling the remote cluster statement for the first time. The listener is intended to be run in parallel and
* async with the index metadata upload.
*
* @opensearch.internal
*/
public interface IndexMetadataUploadInterceptor {

/**
* Intercepts the index metadata upload flow with input containing index metadata of new indexes (or first time upload).
* The caller is expected to trigger onSuccess or onFailure of the {@code ActionListener}.
*
* @param indexMetadataList list of index metadata of new indexes (or first time index metadata upload).
* @param actionListener listener to be invoked on success or failure.
*/
void interceptIndexCreation(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.util.CollectionUtils;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
Expand Down Expand Up @@ -160,7 +161,7 @@ public class RemoteClusterStateService implements Closeable {
private final Settings settings;
private final LongSupplier relativeTimeNanosSupplier;
private final ThreadPool threadpool;
private final IndexCreationPreIndexMetadataUploadListener indexCreationListener;
private final List<IndexMetadataUploadInterceptor> indexMetadataUploadInterceptors;
private BlobStoreRepository blobStoreRepository;
private BlobStoreTransferService blobStoreTransferService;
private volatile TimeValue slowWriteLoggingThreshold;
Expand Down Expand Up @@ -191,7 +192,7 @@ public RemoteClusterStateService(
ClusterSettings clusterSettings,
LongSupplier relativeTimeNanosSupplier,
ThreadPool threadPool,
IndexCreationPreIndexMetadataUploadListener indexCreationListener
List<IndexMetadataUploadInterceptor> indexMetadataUploadInterceptors
) {
assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled";
this.nodeId = nodeId;
Expand All @@ -208,7 +209,7 @@ public RemoteClusterStateService(
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout);
this.remoteStateStats = new RemotePersistenceStats();
this.indexCreationListener = indexCreationListener;
this.indexMetadataUploadInterceptors = indexMetadataUploadInterceptors;
}

private BlobStoreTransferService getBlobStoreTransferService() {
Expand Down Expand Up @@ -452,13 +453,19 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(
List<IndexMetadata> toUpload,
List<IndexMetadata> newIndexMetadataList
) throws IOException {
assert Objects.nonNull(indexCreationListener) : "indexCreationListener can not be null";
int latchCount = toUpload.size() + indexCreationListener.latchCount(newIndexMetadataList);
assert CollectionUtils.isEmpty(indexMetadataUploadInterceptors) == false : "indexMetadataUploadInterceptors can not be empty";
int latchCount = toUpload.size() + indexMetadataUploadInterceptors.size();
List<Exception> exceptionList = Collections.synchronizedList(new ArrayList<>(latchCount));
final CountDownLatch latch = new CountDownLatch(latchCount);
List<UploadedIndexMetadata> result = new ArrayList<>(toUpload.size());
uploadIndexMetadataAsync(clusterState, result, toUpload, latch, exceptionList);
indexCreationListener.run(newIndexMetadataList, latch, exceptionList);

for (IndexMetadataUploadInterceptor interceptor : indexMetadataUploadInterceptors) {
interceptor.interceptIndexCreation(
newIndexMetadataList,
getIndexMetadataUploadInterceptorListener(newIndexMetadataList, latch, exceptionList, interceptor.getClass())
);
}

try {
if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
Expand Down Expand Up @@ -499,6 +506,29 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(
return result;
}

private ActionListener<Void> getIndexMetadataUploadInterceptorListener(
List<IndexMetadata> newIndexMetadataList,
CountDownLatch latch,
List<Exception> exceptionList,
Class clazz
) {
return new LatchedActionListener<>(
ActionListener.wrap(
ignored -> logger.trace(
new ParameterizedMessage("{} : Intercepted {} successfully", clazz.getSimpleName(), newIndexMetadataList)
),
ex -> {
logger.error(
new ParameterizedMessage("{} : Exception during interception of {}", clazz.getSimpleName(), newIndexMetadataList),

Check warning on line 522 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L521-L522

Added lines #L521 - L522 were not covered by tests
ex
);
exceptionList.add(ex);
}

Check warning on line 526 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L525-L526

Added lines #L525 - L526 were not covered by tests
),
latch
);
}

private void uploadIndexMetadataAsync(
ClusterState clusterState,
List<UploadedIndexMetadata> result,
Expand Down Expand Up @@ -1177,7 +1207,7 @@ public void writeMetadataFailed() {
/**
* Exception for Remote state transfer.
*/
static class RemoteStateTransferException extends RuntimeException {
public static class RemoteStateTransferException extends RuntimeException {

public RemoteStateTransferException(String errorDesc) {
super(errorDesc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.IndexCreationPreIndexMetadataUploadListener;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.IndexMetadataUploadInterceptor;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteClusterStateService.RemoteStateTransferException;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
Expand All @@ -28,14 +31,19 @@
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING;
import static org.opensearch.index.remote.RemoteIndexPath.SEGMENT_PATH;
import static org.opensearch.index.remote.RemoteIndexPath.TRANSLOG_PATH;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
Expand All @@ -45,59 +53,83 @@
* Uploads the remote store path for all possible combinations of {@link org.opensearch.index.remote.RemoteStoreEnums.DataCategory}
* and {@link org.opensearch.index.remote.RemoteStoreEnums.DataType} for each shard of an index.
*/
public class RemoteUploadPathIndexCreationListener implements IndexCreationPreIndexMetadataUploadListener {
public class RemoteUploadPathIndexCreationInterceptor implements IndexMetadataUploadInterceptor {

public static final ChecksumBlobStoreFormat<RemoteIndexPath> REMOTE_INDEX_PATH_FORMAT = new ChecksumBlobStoreFormat<>(
"remote-index-path",
RemoteIndexPath.FILE_NAME_FORMAT,
RemoteIndexPath::fromXContent
);

private static final Logger logger = LogManager.getLogger(RemoteUploadPathIndexCreationListener.class);
private static final String TIMEOUT_EXCEPTION_MSG = "Timed out waiting while uploading remote index path file for indexes=%s";
private static final String UPLOAD_EXCEPTION_MSG = "Exception occurred while uploading remote index paths for indexes=%s";

private static final Logger logger = LogManager.getLogger(RemoteUploadPathIndexCreationInterceptor.class);

private final Settings settings;
private final boolean isRemoteDataAttributePresent;
private final boolean isTranslogSegmentRepoSame;
private final Supplier<RepositoriesService> repositoriesService;
private volatile TimeValue indexMetadataUploadTimeout;

private BlobStoreRepository translogRepository;
private BlobStoreRepository segmentRepository;

public RemoteUploadPathIndexCreationListener(Settings settings, Supplier<RepositoriesService> repositoriesService) {
public RemoteUploadPathIndexCreationInterceptor(
Settings settings,
Supplier<RepositoriesService> repositoriesService,
ClusterSettings clusterSettings
) {
this.settings = settings;
this.repositoriesService = repositoriesService;
isRemoteDataAttributePresent = isRemoteDataAttributePresent(settings);
// If the remote data attributes are not present, then there is no effect of translog and segment being same or different or null.
isTranslogSegmentRepoSame = isTranslogSegmentRepoSame();
indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING);
clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout);
}

@Override
public int latchCount(List<IndexMetadata> newIndexMetadataList) {
public void interceptIndexCreation(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener) throws IOException {
if (isRemoteDataAttributePresent == false) {
return 0;
actionListener.onResponse(null);
return;
}
int eligibleIndexCount = (int) newIndexMetadataList.stream().filter(this::uploadIndexPathFile).count();
return isTranslogSegmentRepoSame ? eligibleIndexCount : 2 * eligibleIndexCount;
}

@Override
public void run(List<IndexMetadata> newIndexMetadataList, CountDownLatch latch, List<Exception> exceptionList) throws IOException {
if (isRemoteDataAttributePresent == false) {
return;
List<IndexMetadata> eligibleList = indexMetadataList.stream().filter(this::requiresPathUpload).collect(Collectors.toList());

Check warning on line 99 in server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java#L99

Added line #L99 was not covered by tests
int latchCount = eligibleList.size() * (isTranslogSegmentRepoSame ? 1 : 2);
CountDownLatch latch = new CountDownLatch(latchCount);
List<Exception> exceptionList = Collections.synchronizedList(new ArrayList<>(latchCount));

Check warning on line 102 in server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java#L101-L102

Added lines #L101 - L102 were not covered by tests
for (IndexMetadata indexMetadata : eligibleList) {
writeIndexPathAsync(indexMetadata, latch, exceptionList);
}
List<IndexMetadata> elibibleIndexMetadaList = newIndexMetadataList.stream()
.filter(this::uploadIndexPathFile)
.collect(Collectors.toList());
if (isTranslogSegmentRepoSame) {
assert latchCount(newIndexMetadataList) == elibibleIndexMetadaList.size()
: "Latch count is not equal to elibibleIndexMetadaList's size for path upload";
} else {
assert latchCount(newIndexMetadataList) == 2 * elibibleIndexMetadaList.size()
: "Latch count is not equal to (2 * elibibleIndexMetadaList's size) for path upload";
String indexNames = eligibleList.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining(","));

Check warning on line 106 in server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java#L104-L106

Added lines #L104 - L106 were not covered by tests

try {
if (latch.await(indexMetadataUploadTimeout.millis(), TimeUnit.MILLISECONDS) == false) {
RemoteStateTransferException ex = new RemoteStateTransferException(
String.format(Locale.ROOT, TIMEOUT_EXCEPTION_MSG, indexNames)

Check warning on line 111 in server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java#L110-L111

Added lines #L110 - L111 were not covered by tests
);
exceptionList.forEach(ex::addSuppressed);
actionListener.onFailure(ex);
return;

Check warning on line 115 in server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java#L113-L115

Added lines #L113 - L115 were not covered by tests
}
} catch (InterruptedException exception) {
exceptionList.forEach(exception::addSuppressed);
RemoteStateTransferException ex = new RemoteStateTransferException(
String.format(Locale.ROOT, TIMEOUT_EXCEPTION_MSG, indexNames),

Check warning on line 120 in server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java#L117-L120

Added lines #L117 - L120 were not covered by tests
exception
);
actionListener.onFailure(ex);
}

Check warning on line 124 in server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java#L123-L124

Added lines #L123 - L124 were not covered by tests
for (IndexMetadata indexMetadata : elibibleIndexMetadaList) {
writeIndexPathAsync(indexMetadata, latch, exceptionList);
if (exceptionList.size() > 0) {
RemoteStateTransferException ex = new RemoteStateTransferException(
String.format(Locale.ROOT, UPLOAD_EXCEPTION_MSG, indexNames)

Check warning on line 127 in server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java#L126-L127

Added lines #L126 - L127 were not covered by tests
);
exceptionList.forEach(ex::addSuppressed);
actionListener.onFailure(ex);

Check warning on line 130 in server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java#L129-L130

Added lines #L129 - L130 were not covered by tests
}
actionListener.onResponse(null);
}

Check warning on line 133 in server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java#L132-L133

Added lines #L132 - L133 were not covered by tests

private void writeIndexPathAsync(IndexMetadata idxMD, CountDownLatch latch, List<Exception> exceptionList) throws IOException {
Expand All @@ -122,9 +154,7 @@ private void writeIndexPathAsync(IndexMetadata idxMD, CountDownLatch latch, List
indexUUID,
translogRepository.getCompressor(),
getUploadPathLatchedActionListener(idxMD, latch, exceptionList, pathCreationMap),

Check warning on line 156 in server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java#L155-L156

Added lines #L155 - L156 were not covered by tests
RemoteClusterStateService.FORMAT_PARAMS,
true,
XContentType.JSON
RemoteClusterStateService.FORMAT_PARAMS
);
} else {

Check warning on line 159 in server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java#L159

Added line #L159 was not covered by tests
// If the repositories are different, then we need to upload one file per segment and translog containing their individual
Expand All @@ -135,9 +165,7 @@ private void writeIndexPathAsync(IndexMetadata idxMD, CountDownLatch latch, List
indexUUID,
translogRepository.getCompressor(),
getUploadPathLatchedActionListener(idxMD, latch, exceptionList, TRANSLOG_PATH),

Check warning on line 167 in server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java#L166-L167

Added lines #L166 - L167 were not covered by tests
RemoteClusterStateService.FORMAT_PARAMS,
true,
XContentType.JSON
RemoteClusterStateService.FORMAT_PARAMS
);

BlobPath segmentBasePath = segmentRepository.basePath();
Expand All @@ -148,9 +176,7 @@ private void writeIndexPathAsync(IndexMetadata idxMD, CountDownLatch latch, List
indexUUID,
segmentRepository.getCompressor(),
getUploadPathLatchedActionListener(idxMD, latch, exceptionList, SEGMENT_PATH),

Check warning on line 178 in server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java#L177-L178

Added lines #L177 - L178 were not covered by tests
RemoteClusterStateService.FORMAT_PARAMS,
true,
XContentType.JSON
RemoteClusterStateService.FORMAT_PARAMS
);
}
}

Check warning on line 182 in server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java#L182

Added line #L182 was not covered by tests
Expand Down Expand Up @@ -218,7 +244,7 @@ private LatchedActionListener<Void> getUploadPathLatchedActionListener(
* This method checks if the index metadata has attributes that calls for uploading the index path for remote store
* uploads. It checks if the remote store path type is {@code HASHED_PREFIX} and returns true if so.
*/
private boolean uploadIndexPathFile(IndexMetadata indexMetadata) {
private boolean requiresPathUpload(IndexMetadata indexMetadata) {
// A cluster will have remote custom metadata only if the cluster is remote store enabled from data side.
Map<String, String> remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);

Check warning on line 249 in server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationInterceptor.java#L249

Added line #L249 was not covered by tests
if (Objects.isNull(remoteCustomData) || remoteCustomData.isEmpty()) {
Expand All @@ -231,4 +257,8 @@ private boolean uploadIndexPathFile(IndexMetadata indexMetadata) {
// We need to upload the path only if the path type for an index is hashed_prefix
return RemoteStoreEnums.PathType.HASHED_PREFIX == RemoteStoreEnums.PathType.parseString(pathTypeStr);
}

private void setIndexMetadataUploadTimeout(TimeValue newIndexMetadataUploadTimeout) {
this.indexMetadataUploadTimeout = newIndexMetadataUploadTimeout;
}
}
Loading

0 comments on commit 2b0d714

Please sign in to comment.