From 41fd1ecb125b291d0a54819d49870344d810c955 Mon Sep 17 00:00:00 2001 From: Nishant Goel Date: Mon, 8 Jul 2024 15:48:38 +0530 Subject: [PATCH] Fixing github workflows and clearing up IT file Signed-off-by: Nishant Goel --- .../replication/SegmentReplicationBaseIT.java | 7 +- .../replication/SegmentReplicationIT.java | 10 +- ...mIndexRemoteStoreSegmentReplicationIT.java | 95 +++---------------- .../index/engine/NRTReplicationEngine.java | 2 +- .../RemoteStoreReplicationSource.java | 5 +- .../replication/SegmentReplicationTarget.java | 9 +- 6 files changed, 30 insertions(+), 98 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index 549b751a7efea..7c521927b4dce 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -8,7 +8,6 @@ package org.opensearch.indices.replication; -import java.util.Objects; import org.apache.lucene.index.SegmentInfos; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.ClusterState; @@ -43,6 +42,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -246,6 +246,9 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio } protected boolean warmIndexSegmentReplicationEnabled() { - return Objects.equals(IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(), IndexModule.DataLocalityType.PARTIAL.name()); + return Objects.equals( + IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(), + IndexModule.DataLocalityType.PARTIAL.name() + ); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 4ebbedddca4fd..dc16c0c4db439 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -20,8 +20,6 @@ import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.StandardDirectoryReader; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse; @@ -451,8 +449,9 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected assertThat(forceMergeResponse.getFailedShards(), is(0)); assertThat(forceMergeResponse.getSuccessfulShards(), is(expectedSuccessfulShards)); refresh(INDEX_NAME); - //skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote store. - if(!warmIndexSegmentReplicationEnabled()) { + // skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote + // store. + if (!warmIndexSegmentReplicationEnabled()) { verifyStoreContent(); } } @@ -961,7 +960,8 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception { } ensureGreen(INDEX_NAME); waitForSearchableDocs(docCount, primaryNode, replicaNode); - //skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote store. + // skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote + // store. if (!warmIndexSegmentReplicationEnabled()) { verifyStoreContent(); } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java index d24e53f9a4fcc..61b21df12b514 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java @@ -8,40 +8,26 @@ package org.opensearch.indices.replication; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; - import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; -import java.nio.file.Path; -import java.util.Locale; -import java.util.Map; -import java.util.stream.Collectors; -import org.junit.After; -import org.junit.Before; -import org.opensearch.cluster.metadata.RepositoriesMetadata; -import org.opensearch.cluster.metadata.RepositoryMetadata; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.service.ClusterService; + import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexModule; import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.node.Node; -import org.opensearch.repositories.RepositoriesService; -import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.After; +import org.junit.Before; + +import java.nio.file.Path; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) @ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) public class WarmIndexRemoteStoreSegmentReplicationIT extends SegmentReplicationIT { protected static final String REPOSITORY_NAME = "test-remote-store-repo"; - protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2"; - - protected Path segmentRepoPath; - protected Path translogRepoPath; - protected boolean clusterSettingsSuppliedByTest = false; + protected Path absolutePath; @Before private void setup() { @@ -58,19 +44,13 @@ public Settings indexSettings() { @Override protected Settings nodeSettings(int nodeOrdinal) { - if (segmentRepoPath == null || translogRepoPath == null) { - segmentRepoPath = randomRepoPath().toAbsolutePath(); - translogRepoPath = randomRepoPath().toAbsolutePath(); - } - if (clusterSettingsSuppliedByTest) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build(); - } else { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath)) - //.put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), -1) - .build(); + if (absolutePath == null) { + absolutePath = randomRepoPath().toAbsolutePath(); } + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath)) + .build(); } @Override @@ -92,62 +72,11 @@ protected boolean warmIndexSegmentReplicationEnabled() { @After public void teardown() { - clusterSettingsSuppliedByTest = false; for (String nodeName : internalCluster().getNodeNames()) { logger.info("file cache node name is {}", nodeName); FileCache fileCache = internalCluster().getInstance(Node.class, nodeName).fileCache(); fileCache.clear(); } - assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_NAME); - assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_2_NAME); clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get(); - clusterAdmin().prepareCleanupRepository(REPOSITORY_2_NAME).get(); } - - public RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) { - Map nodeAttributes = node.getAttributes(); - String type = nodeAttributes.get(String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name)); - - String settingsAttributeKeyPrefix = String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, name); - Map settingsMap = node.getAttributes() - .keySet() - .stream() - .filter(key -> key.startsWith(settingsAttributeKeyPrefix)) - .collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> node.getAttributes().get(key))); - - Settings.Builder settings = Settings.builder(); - settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue())); - settings.put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true); - - return new RepositoryMetadata(name, type, settings.build()); - } - - public void assertRemoteStoreRepositoryOnAllNodes(String repositoryName) { - RepositoriesMetadata repositories = internalCluster().getInstance(ClusterService.class, internalCluster().getNodeNames()[0]) - .state() - .metadata() - .custom(RepositoriesMetadata.TYPE); - RepositoryMetadata actualRepository = repositories.repository(repositoryName); - - final RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); - final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repositoryName); - - for (String nodeName : internalCluster().getNodeNames()) { - ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeName); - DiscoveryNode node = clusterService.localNode(); - RepositoryMetadata expectedRepository = buildRepositoryMetadata(node, repositoryName); - - // Validated that all the restricted settings are entact on all the nodes. - repository.getRestrictedSystemRepositorySettings() - .stream() - .forEach( - setting -> assertEquals( - String.format(Locale.ROOT, "Restricted Settings mismatch [%s]", setting.getKey()), - setting.get(actualRepository.settings()), - setting.get(expectedRepository.settings()) - ) - ); - } - } - } diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 4a7c804fc74d8..c63846ffccf4c 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -171,7 +171,7 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep // a lower gen from a newly elected primary shard that is behind this shard's last commit gen. // In that case we still commit into the next local generation. if (incomingGeneration != this.lastReceivedPrimaryGen) { - if(engineConfig.getIndexSettings().isStoreLocalityPartial() == false) { + if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) { flush(false, true); } translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo); diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index 0bc762d1ec6de..a9f026badeee0 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -117,10 +117,11 @@ public void getSegmentFiles( final List toDownloadSegmentNames = new ArrayList<>(); for (StoreFileMetadata fileMetadata : filesToFetch) { String file = fileMetadata.name(); - assert directoryFiles.contains(file) == false || indexShard.indexSettings().isStoreLocalityPartial() : "Local store already contains the file " + file; + assert directoryFiles.contains(file) == false || indexShard.indexSettings().isStoreLocalityPartial() + : "Local store already contains the file " + file; toDownloadSegmentNames.add(file); } - if(indexShard.indexSettings().isStoreLocalityPartial()) { + if (indexShard.indexSettings().isStoreLocalityPartial()) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); return; } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index e853e0c301ba7..175bb6592af10 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -8,7 +8,6 @@ package org.opensearch.indices.replication; -import java.util.Map; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; @@ -39,6 +38,7 @@ import java.io.UncheckedIOException; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -205,7 +205,8 @@ public void startReplication(ActionListener listener) { }, listener::onFailure); } - private List getFiles(CheckpointInfoResponse checkpointInfo, Map finalReplicaMd) throws IOException { + private List getFiles(CheckpointInfoResponse checkpointInfo, Map finalReplicaMd) + throws IOException { cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FILE_DIFF); final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), finalReplicaMd); @@ -223,9 +224,7 @@ private List getFiles(CheckpointInfoResponse checkpointInfo, .map(StoreFileMetadata::name) .collect(Collectors.toSet()); - missingFiles = diff.missing.stream() - .filter(md -> reuseFiles.contains(md.name()) == false) - .collect(Collectors.toList()); + missingFiles = diff.missing.stream().filter(md -> reuseFiles.contains(md.name()) == false).collect(Collectors.toList()); logger.trace( () -> new ParameterizedMessage(