Skip to content

Commit

Permalink
Merge branch 'main' into publication-setting-change
Browse files Browse the repository at this point in the history
  • Loading branch information
shiv0408 committed Sep 5, 2024
2 parents eb38647 + 2f1e209 commit c142e1d
Show file tree
Hide file tree
Showing 23 changed files with 624 additions and 103 deletions.
1 change: 1 addition & 0 deletions .ci/bwcVersions
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ BWC_VERSION:
- "2.16.0"
- "2.16.1"
- "2.17.0"
- "2.18.0"
9 changes: 3 additions & 6 deletions .github/workflows/assemble.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,8 @@ jobs:
- name: Setup docker (missing on MacOS)
id: setup_docker
if: runner.os == 'macos'
uses: douglascamata/setup-docker-macos-action@main
continue-on-error: true
with:
upgrade-qemu: true
colima: v0.6.8
run: |
exit 0;
- name: Run Gradle (assemble)
if: runner.os == 'macos' && steps.setup_docker.outcome != 'success'
run: |
Expand All @@ -48,4 +45,4 @@ jobs:
- name: Run Gradle (assemble)
if: runner.os == 'macos' && steps.setup_docker.outcome == 'success'
run: |
./gradlew assemble --parallel --no-build-cache -PDISABLE_BUILD_CACHE -Druntime.java=${{ matrix.java }}
exit 0;
1 change: 1 addition & 0 deletions libs/core/src/main/java/org/opensearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public class Version implements Comparable<Version>, ToXContentFragment {
public static final Version V_2_16_0 = new Version(2160099, org.apache.lucene.util.Version.LUCENE_9_11_1);
public static final Version V_2_16_1 = new Version(2160199, org.apache.lucene.util.Version.LUCENE_9_11_1);
public static final Version V_2_17_0 = new Version(2170099, org.apache.lucene.util.Version.LUCENE_9_11_1);
public static final Version V_2_18_0 = new Version(2180099, org.apache.lucene.util.Version.LUCENE_9_11_1);
public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_12_0);
public static final Version CURRENT = V_3_0_0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -48,19 +52,27 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;

@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class RemoteStatePublicationIT extends RemoteStoreBaseIntegTestCase {

private static String INDEX_NAME = "test-index";
private static final String INDEX_NAME = "test-index";
private static final String REMOTE_STATE_PREFIX = "!";
private static final String REMOTE_ROUTING_PREFIX = "_";
private boolean isRemoteStateEnabled = true;
private String isRemotePublicationEnabled = "true";
private boolean hasRemoteStateCharPrefix;
private boolean hasRemoteRoutingCharPrefix;

@Before
public void setup() {
asyncUploadMockFsRepo = false;
isRemoteStateEnabled = true;
isRemotePublicationEnabled = "true";
hasRemoteStateCharPrefix = randomBoolean();
hasRemoteRoutingCharPrefix = randomBoolean();
}

@Override
Expand All @@ -76,6 +88,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
routingTableRepoName
);

return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), isRemoteStateEnabled)
Expand All @@ -87,6 +100,19 @@ protected Settings nodeSettings(int nodeOrdinal) {
RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(),
RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE
)
.put(
RemoteClusterStateService.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.getKey(),
hasRemoteStateCharPrefix ? REMOTE_STATE_PREFIX : ""
)
.put(
RemoteRoutingTableBlobStore.CLUSTER_REMOTE_STORE_ROUTING_TABLE_PATH_PREFIX.getKey(),
hasRemoteRoutingCharPrefix ? REMOTE_ROUTING_PREFIX : ""
)
.put(RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX.toString())
.put(
RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING.getKey(),
PathHashAlgorithm.FNV_1A_COMPOSITE_1.toString()
)
.build();
}

Expand Down Expand Up @@ -130,6 +156,27 @@ public void testPublication() throws Exception {
Map<String, Integer> manifestFiles = getMetadataFiles(repository, RemoteClusterMetadataManifest.MANIFEST);
assertTrue(manifestFiles.containsKey(RemoteClusterMetadataManifest.MANIFEST));

RemoteClusterStateService remoteClusterStateService = internalCluster().getInstance(
RemoteClusterStateService.class,
internalCluster().getClusterManagerName()
);
ClusterMetadataManifest manifest = remoteClusterStateService.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().metadata().clusterUUID()
).get();
assertThat(manifest.getIndices().size(), is(1));
if (hasRemoteStateCharPrefix) {
for (UploadedIndexMetadata md : manifest.getIndices()) {
assertThat(md.getUploadedFilename(), startsWith(REMOTE_STATE_PREFIX));
}
}
assertThat(manifest.getIndicesRouting().size(), is(1));
if (hasRemoteRoutingCharPrefix) {
for (UploadedIndexMetadata md : manifest.getIndicesRouting()) {
assertThat(md.getUploadedFilename(), startsWith(REMOTE_ROUTING_PREFIX));
}
}

// get settings from each node and verify that it is updated
Settings settings = clusterService().getSettings();
logger.info("settings : {}", settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,17 @@ public void testFullClusterRestoreManifestFilePointsToInvalidIndexMetadataPathTh
internalCluster().stopAllNodes();
// Step - 3 Delete index metadata file in remote
try {
Files.move(
segmentRepoPath.resolve(encodeString(clusterName) + "/cluster-state/" + prevClusterUUID + "/index"),
segmentRepoPath.resolve("cluster-state/")
RemoteClusterStateService remoteClusterStateService = internalCluster().getInstance(
RemoteClusterStateService.class,
internalCluster().getClusterManagerName()
);
ClusterMetadataManifest manifest = remoteClusterStateService.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().metadata().clusterUUID()
).get();
for (UploadedIndexMetadata md : manifest.getIndices()) {
Files.move(segmentRepoPath.resolve(md.getUploadedFilename()), segmentRepoPath.resolve("cluster-state/"));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.cluster.metadata.ViewMetadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.routing.DelayedAllocationService;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
Expand Down Expand Up @@ -479,4 +480,7 @@ public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, Shard
allocationService.setExistingShardsAllocators(existingShardsAllocators);
}

public void setRerouteServiceForAllocator(RerouteService rerouteService) {
shardsAllocator.setRerouteService(rerouteService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IntroSorter;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardMovementStrategy;
Expand All @@ -49,12 +50,14 @@
import org.opensearch.cluster.routing.allocation.RebalanceParameter;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.ShardAllocationDecision;
import org.opensearch.common.Priority;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -202,6 +205,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
private volatile boolean ignoreThrottleInRestore;
private volatile TimeValue allocatorTimeout;
private long startTime;
private RerouteService rerouteService;

public BalancedShardsAllocator(Settings settings) {
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
Expand Down Expand Up @@ -231,6 +235,12 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout);
}

@Override
public void setRerouteService(RerouteService rerouteService) {
assert this.rerouteService == null : "RerouteService is already set";
this.rerouteService = rerouteService;
}

/**
* Changes in deprecated setting SHARD_MOVE_PRIMARY_FIRST_SETTING affect value of its replacement setting SHARD_MOVEMENT_STRATEGY_SETTING.
*/
Expand Down Expand Up @@ -342,6 +352,7 @@ public void allocate(RoutingAllocation allocation) {
localShardsBalancer.allocateUnassigned();
localShardsBalancer.moveShards();
localShardsBalancer.balance();
scheduleRerouteIfAllocatorTimedOut();

final ShardsBalancer remoteShardsBalancer = new RemoteShardsBalancer(logger, allocation);
remoteShardsBalancer.allocateUnassigned();
Expand Down Expand Up @@ -404,6 +415,20 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) {
}
}

private void scheduleRerouteIfAllocatorTimedOut() {
if (allocatorTimedOut()) {
assert rerouteService != null : "RerouteService not set to schedule reroute after allocator time out";
rerouteService.reroute(
"reroute after balanced shards allocator timed out",
Priority.HIGH,
ActionListener.wrap(
r -> logger.trace("reroute after balanced shards allocator timed out completed"),
e -> logger.debug("reroute after balanced shards allocator timed out failed", e)
)
);
}
}

/**
* Returns the currently configured delta threshold
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.cluster.routing.allocation.allocator;

import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.MoveDecision;
Expand Down Expand Up @@ -73,4 +74,6 @@ public interface ShardsAllocator {
* the cluster explain API, then this method should throw a {@code UnsupportedOperationException}.
*/
ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation);

default void setRerouteService(RerouteService rerouteService) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ public String[] getBlobPathTokens() {

public abstract String generateBlobFileName();

/**
* Generate the blob path for the remote entity by adding a custom prefix.
* This custom prefix may be generated by any of the strategies defined in {@link org.opensearch.index.remote.RemoteStoreEnums}
* The default implementation returns the same path as passed in the argument.
* @param blobPath The remote path on which the remote entity is to be uploaded
* @return The modified remote path after adding a custom prefix at which the remote entity will be uploaded.
*/
public BlobPath getPrefixedPath(BlobPath blobPath) {
return blobPath;
}

public String clusterUUID() {
return clusterUUID;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public BlobPath getBlobPathForUpload(final RemoteWriteableBlobEntity<T> obj) {
for (String token : obj.getBlobPathParameters().getPathTokens()) {
blobPath = blobPath.add(token);
}
return blobPath;
return obj.getPrefixedPath(blobPath);
}

public BlobPath getBlobPathForDownload(final RemoteWriteableBlobEntity<T> obj) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteIndexMetadataManager;
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.index.IndexModule;
Expand Down Expand Up @@ -737,13 +738,17 @@ public void apply(Settings value, Settings current, Settings previous) {
GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.REMOTE_STATE_READ_TIMEOUT_SETTING,
RemoteClusterStateService.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX,
RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_TYPE_SETTING,
RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING,
RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING,
RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING,
RemoteRoutingTableBlobStore.CLUSTER_REMOTE_STORE_ROUTING_TABLE_PATH_PREFIX,

// Admission Control Settings
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ public void cleanCaches() {

// for tests
protected ShardsBatchGatewayAllocator() {
this(DEFAULT_SHARD_BATCH_SIZE);
this(DEFAULT_SHARD_BATCH_SIZE, null);
}

protected ShardsBatchGatewayAllocator(long batchSize) {
this.rerouteService = null;
protected ShardsBatchGatewayAllocator(long batchSize, RerouteService rerouteService) {
this.rerouteService = rerouteService;
this.batchStartedAction = null;
this.primaryShardBatchAllocator = null;
this.batchStoreAction = null;
Expand Down Expand Up @@ -297,6 +297,18 @@ public void run() {
public void onComplete() {
logger.trace("Triggering oncomplete after timeout for [{}] primary shards", timedOutPrimaryShardIds.size());
primaryBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutPrimaryShardIds, allocation, true);
if (timedOutPrimaryShardIds.isEmpty() == false) {
logger.trace("scheduling reroute after existing shards allocator timed out for primary shards");
assert rerouteService != null;
rerouteService.reroute(
"reroute after existing shards allocator timed out",
Priority.HIGH,
ActionListener.wrap(
r -> logger.trace("reroute after existing shards allocator timed out completed"),
e -> logger.debug("reroute after existing shards allocator timed out failed", e)
)
);
}
}
};
} else {
Expand All @@ -320,6 +332,18 @@ public void run() {
public void onComplete() {
logger.trace("Triggering oncomplete after timeout for [{}] replica shards", timedOutReplicaShardIds.size());
replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutReplicaShardIds, allocation, false);
if (timedOutReplicaShardIds.isEmpty() == false) {
logger.trace("scheduling reroute after existing shards allocator timed out for replica shards");
assert rerouteService != null;
rerouteService.reroute(
"reroute after existing shards allocator timed out",
Priority.HIGH,
ActionListener.wrap(
r -> logger.trace("reroute after existing shards allocator timed out completed"),
e -> logger.debug("reroute after existing shards allocator timed out failed", e)
)
);
}
}
};
}
Expand Down
Loading

0 comments on commit c142e1d

Please sign in to comment.