Skip to content

Commit

Permalink
Initial commit for scale to zero
Browse files Browse the repository at this point in the history
Signed-off-by: Prudhvi Godithi <[email protected]>
  • Loading branch information
prudhvigodithi committed Nov 25, 2024
1 parent 9f790ee commit de948c8
Show file tree
Hide file tree
Showing 18 changed files with 884 additions and 129 deletions.
11 changes: 11 additions & 0 deletions gradle/run.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ testClusters {
plugin('plugins:'.concat(p))
}
}
setting 'opensearch.experimental.feature.read.write.split.enabled', 'true'
setting 'path.repo', '["/tmp/my-repo"]'
setting 'node.attr.remote_store', 'true'
setting 'cluster.remote_store.state.enabled', 'true'
setting 'node.attr.remote_store.segment.repository', 'my-repository'
setting 'node.attr.remote_store.translog.repository', 'my-repository'
setting 'node.attr.remote_store.repository.my-repository.type', 'fs'
setting 'node.attr.remote_store.state.repository', 'my-repository'
setting 'node.attr.remote_store.repository.my-repository.settings.location', '/tmp/my-repo'


}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,17 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.cluster.AckedClusterStateUpdateTask;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ack.ClusterStateUpdateResponse;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.MetadataUpdateSettingsService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -168,8 +171,66 @@ protected void clusterManagerOperation(
.masterNodeTimeout(request.clusterManagerNodeTimeout());

updateSettingsService.updateSettings(clusterStateUpdateRequest, new ActionListener<ClusterStateUpdateResponse>() {

/**
* Handles the response from the initial settings update during scale-down operations.
* This is a critical part of the two-phase scale-down process:
* <p>
* 1. First Phase (Settings Update):
* - Updates the remove_indexing_shards setting across all nodes
* - Waits for acknowledgment from the cluster
* - Allows nodes to prepare for scale-down operations
* <p>
* 2. Second Phase (Routing Update):
* - Only triggered after successful settings propagation
* - Updates routing tables to remove primary and replica shards
* - Ensures data consistency before shard removal
* <p>
* This two-phase approach prevents data loss by ensuring all nodes are prepared
* before any shards are removed from the routing table. It's essential for maintaining
* cluster stability during scale-down operations.
*
* @param response The cluster state update response from the settings update
*/

@Override
public void onResponse(ClusterStateUpdateResponse response) {
if (response.isAcknowledged()
&& request.settings().hasValue(IndexMetadata.SETTING_REMOVE_INDEXING_SHARDS)
&& request.settings().getAsBoolean(IndexMetadata.SETTING_REMOVE_INDEXING_SHARDS, false)) {
UpdateSettingsClusterStateUpdateRequest updateRequest = new UpdateSettingsClusterStateUpdateRequest().indices(
concreteIndices
).ackTimeout(request.timeout()).masterNodeTimeout(request.clusterManagerNodeTimeout());

clusterService.submitStateUpdateTask(
"update-routing-table-after-settings",
new AckedClusterStateUpdateTask<>(Priority.URGENT, updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new AcknowledgedResponse(response.isAcknowledged()));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}) {

@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}

@Override
public ClusterState execute(ClusterState currentState) {
return updateSettingsService.updateRoutingTableForRemoveIndexShards(
Arrays.stream(concreteIndices).map(Index::getName).toArray(String[]::new),
currentState
);
}
}
);
}
listener.onResponse(new AcknowledgedResponse(response.isAcknowledged()));
}

Expand Down
Loading

0 comments on commit de948c8

Please sign in to comment.