Skip to content

Commit

Permalink
Fixing github workflows and clearing up IT file
Browse files Browse the repository at this point in the history
Signed-off-by: Nishant Goel <[email protected]>
  • Loading branch information
nisgoel-amazon committed Jul 8, 2024
1 parent 1ef59a0 commit 41fd1ec
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.indices.replication;

import java.util.Objects;
import org.apache.lucene.index.SegmentInfos;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -43,6 +42,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -246,6 +246,9 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio
}

protected boolean warmIndexSegmentReplicationEnabled() {
return Objects.equals(IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(), IndexModule.DataLocalityType.PARTIAL.name());
return Objects.equals(
IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(),
IndexModule.DataLocalityType.PARTIAL.name()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse;
Expand Down Expand Up @@ -451,8 +449,9 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected
assertThat(forceMergeResponse.getFailedShards(), is(0));
assertThat(forceMergeResponse.getSuccessfulShards(), is(expectedSuccessfulShards));
refresh(INDEX_NAME);
//skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote store.
if(!warmIndexSegmentReplicationEnabled()) {
// skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote
// store.
if (!warmIndexSegmentReplicationEnabled()) {
verifyStoreContent();
}
}
Expand Down Expand Up @@ -961,7 +960,8 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
}
ensureGreen(INDEX_NAME);
waitForSearchableDocs(docCount, primaryNode, replicaNode);
//skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote store.
// skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote
// store.
if (!warmIndexSegmentReplicationEnabled()) {
verifyStoreContent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,26 @@

package org.opensearch.indices.replication;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import java.nio.file.Path;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;

import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.node.Node;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Path;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public class WarmIndexRemoteStoreSegmentReplicationIT extends SegmentReplicationIT {

protected static final String REPOSITORY_NAME = "test-remote-store-repo";
protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2";

protected Path segmentRepoPath;
protected Path translogRepoPath;
protected boolean clusterSettingsSuppliedByTest = false;
protected Path absolutePath;

@Before
private void setup() {
Expand All @@ -58,19 +44,13 @@ public Settings indexSettings() {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (segmentRepoPath == null || translogRepoPath == null) {
segmentRepoPath = randomRepoPath().toAbsolutePath();
translogRepoPath = randomRepoPath().toAbsolutePath();
}
if (clusterSettingsSuppliedByTest) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
} else {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
//.put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), -1)
.build();
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
.build();
}

@Override
Expand All @@ -92,62 +72,11 @@ protected boolean warmIndexSegmentReplicationEnabled() {

@After
public void teardown() {
clusterSettingsSuppliedByTest = false;
for (String nodeName : internalCluster().getNodeNames()) {
logger.info("file cache node name is {}", nodeName);
FileCache fileCache = internalCluster().getInstance(Node.class, nodeName).fileCache();
fileCache.clear();
}
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_NAME);
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_2_NAME);
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
clusterAdmin().prepareCleanupRepository(REPOSITORY_2_NAME).get();
}

public RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) {
Map<String, String> nodeAttributes = node.getAttributes();
String type = nodeAttributes.get(String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name));

String settingsAttributeKeyPrefix = String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, name);
Map<String, String> settingsMap = node.getAttributes()
.keySet()
.stream()
.filter(key -> key.startsWith(settingsAttributeKeyPrefix))
.collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> node.getAttributes().get(key)));

Settings.Builder settings = Settings.builder();
settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue()));
settings.put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true);

return new RepositoryMetadata(name, type, settings.build());
}

public void assertRemoteStoreRepositoryOnAllNodes(String repositoryName) {
RepositoriesMetadata repositories = internalCluster().getInstance(ClusterService.class, internalCluster().getNodeNames()[0])
.state()
.metadata()
.custom(RepositoriesMetadata.TYPE);
RepositoryMetadata actualRepository = repositories.repository(repositoryName);

final RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repositoryName);

for (String nodeName : internalCluster().getNodeNames()) {
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeName);
DiscoveryNode node = clusterService.localNode();
RepositoryMetadata expectedRepository = buildRepositoryMetadata(node, repositoryName);

// Validated that all the restricted settings are entact on all the nodes.
repository.getRestrictedSystemRepositorySettings()
.stream()
.forEach(
setting -> assertEquals(
String.format(Locale.ROOT, "Restricted Settings mismatch [%s]", setting.getKey()),
setting.get(actualRepository.settings()),
setting.get(expectedRepository.settings())
)
);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep
// a lower gen from a newly elected primary shard that is behind this shard's last commit gen.
// In that case we still commit into the next local generation.
if (incomingGeneration != this.lastReceivedPrimaryGen) {
if(engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
flush(false, true);
}
translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,11 @@ public void getSegmentFiles(
final List<String> toDownloadSegmentNames = new ArrayList<>();
for (StoreFileMetadata fileMetadata : filesToFetch) {
String file = fileMetadata.name();
assert directoryFiles.contains(file) == false || indexShard.indexSettings().isStoreLocalityPartial() : "Local store already contains the file " + file;
assert directoryFiles.contains(file) == false || indexShard.indexSettings().isStoreLocalityPartial()
: "Local store already contains the file " + file;
toDownloadSegmentNames.add(file);
}
if(indexShard.indexSettings().isStoreLocalityPartial()) {
if (indexShard.indexSettings().isStoreLocalityPartial()) {
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.indices.replication;

import java.util.Map;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
Expand Down Expand Up @@ -39,6 +38,7 @@
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -205,7 +205,8 @@ public void startReplication(ActionListener<Void> listener) {
}, listener::onFailure);
}

private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo, Map<String, StoreFileMetadata> finalReplicaMd) throws IOException {
private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo, Map<String, StoreFileMetadata> finalReplicaMd)
throws IOException {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), finalReplicaMd);
Expand All @@ -223,9 +224,7 @@ private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo,
.map(StoreFileMetadata::name)
.collect(Collectors.toSet());

missingFiles = diff.missing.stream()
.filter(md -> reuseFiles.contains(md.name()) == false)
.collect(Collectors.toList());
missingFiles = diff.missing.stream().filter(md -> reuseFiles.contains(md.name()) == false).collect(Collectors.toList());

logger.trace(
() -> new ParameterizedMessage(
Expand Down

0 comments on commit 41fd1ec

Please sign in to comment.