Skip to content

Commit

Permalink
Initial commit for scale to zero
Browse files Browse the repository at this point in the history
Signed-off-by: Prudhvi Godithi <[email protected]>
  • Loading branch information
prudhvigodithi committed Nov 10, 2024
1 parent 9f790ee commit bfa69d5
Show file tree
Hide file tree
Showing 13 changed files with 434 additions and 42 deletions.
3 changes: 3 additions & 0 deletions gradle/run.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ testClusters {
plugin('plugins:'.concat(p))
}
}
setting 'opensearch.experimental.feature.read.write.split.enabled', 'true'
setting 'path.repo', '["/tmp/my-repo"]'
setting 'node.attr.remote_store', 'true'
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.RemoteStoreMigrationAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.RemoveIndexingShardsAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ResizeAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider;
Expand Down Expand Up @@ -398,6 +399,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new RemoveIndexingShardsAllocationDecider());
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeLoadAwareAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new TargetPoolAllocationDecider());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,15 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
Setting.Property.Final
);

public static final String SETTING_REMOVE_INDEXING_SHARDS = "index.remove_indexing_shards.enabled";
public static final Setting<Boolean> INDEX_REMOVE_INDEXING_SHARDS_SETTING = Setting.boolSetting(
SETTING_REMOVE_INDEXING_SHARDS,
false,
Property.Dynamic,
Property.IndexScope
);

private final boolean isRemoveIndexingShards;
public static final String KEY_IN_SYNC_ALLOCATIONS = "in_sync_allocations";
static final String KEY_VERSION = "version";
static final String KEY_MAPPING_VERSION = "mapping_version";
Expand Down Expand Up @@ -742,7 +751,8 @@ private IndexMetadata(
final Map<String, RolloverInfo> rolloverInfos,
final boolean isSystem,
final int indexTotalShardsPerNodeLimit,
final Context context
final Context context,
final boolean isRemoveIndexingShards
) {

this.index = index;
Expand All @@ -759,7 +769,12 @@ private IndexMetadata(
this.numberOfShards = numberOfShards;
this.numberOfReplicas = numberOfReplicas;
this.numberOfSearchOnlyReplicas = numberOfSearchOnlyReplicas;
this.totalNumberOfShards = numberOfShards * (numberOfReplicas + numberOfSearchOnlyReplicas + 1);
this.isRemoveIndexingShards = isRemoveIndexingShards;
if (this.isRemoveIndexingShards) {
this.totalNumberOfShards = numberOfShards * numberOfSearchOnlyReplicas;
} else {
this.totalNumberOfShards = numberOfShards * (numberOfReplicas + numberOfSearchOnlyReplicas + 1);
}
this.settings = settings;
this.mappings = Collections.unmodifiableMap(mappings);
this.customData = Collections.unmodifiableMap(customData);
Expand All @@ -783,6 +798,10 @@ private IndexMetadata(
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
}

public boolean isRemoveIndexingShards() {
return isRemoveIndexingShards;
}

public Index getIndex() {
return index;
}
Expand Down Expand Up @@ -1376,6 +1395,7 @@ public static class Builder {
private Integer routingNumShards;
private boolean isSystem;
private Context context;
private boolean isRemoveIndexingShards = false;

public Builder(String index) {
this.index = index;
Expand All @@ -1387,6 +1407,11 @@ public Builder(String index) {
this.isSystem = false;
}

public Builder removeIndexingShards(boolean isRemoveIndexingShards) {
this.isRemoveIndexingShards = isRemoveIndexingShards;
return this;
}

public Builder(IndexMetadata indexMetadata) {
this.index = indexMetadata.getIndex().getName();
this.state = indexMetadata.state;
Expand Down Expand Up @@ -1653,6 +1678,18 @@ public IndexMetadata build() {
final int numberOfReplicas = INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings);
final int numberOfSearchReplicas = INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.get(settings);

/// The validation can be added at the Metadata level
/*if (isScaledToZero) {
if (numberOfSearchReplicas == 0) {
throw new IllegalArgumentException("Cannot scale to zero without search replicas");
}
if (!INDEX_REMOTE_STORE_ENABLED_SETTING.get(settings)) {
throw new IllegalArgumentException("Remote store must be enabled to scale to zero");
}
if (!INDEX_REPLICATION_TYPE_SETTING.get(settings).equals(ReplicationType.SEGMENT)) {
throw new IllegalArgumentException("Segment replication must be enabled to scale to zero");
}
}*/
int routingPartitionSize = INDEX_ROUTING_PARTITION_SIZE_SETTING.get(settings);
if (routingPartitionSize != 1 && routingPartitionSize >= getRoutingNumShards()) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -1765,7 +1802,8 @@ public IndexMetadata build() {
rolloverInfos,
isSystem,
indexTotalShardsPerNodeLimit,
context
context,
isRemoveIndexingShards
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,37 @@ public ClusterState execute(ClusterState currentState) {

}

if (IndexMetadata.INDEX_REMOVE_INDEXING_SHARDS_SETTING.exists(openSettings)) {
boolean removeIndexingShards = IndexMetadata.INDEX_REMOVE_INDEXING_SHARDS_SETTING.get(openSettings);
if (removeIndexingShards) {
// Validate prerequisites at cluster state level
for (Index index : request.indices()) {
IndexMetadata indexMetadata = currentState.metadata().getIndexSafe(index);
if (indexMetadata.getNumberOfSearchOnlyReplicas() == 0) {
throw new IllegalArgumentException(
"Cannot scale to zero without search replicas for index: " + index.getName()
);
}
/*if (!indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false)) {
throw new IllegalArgumentException(
"Remote store must be enabled to scale to zero for index: " + index.getName());
}*/
if (!indexMetadata.getSettings()
.get(IndexMetadata.SETTING_REPLICATION_TYPE)
.equals(ReplicationType.SEGMENT.toString())) {
throw new IllegalArgumentException(
"Segment replication must be enabled to scale to zero for index: " + index.getName()
);
}
}
}
for (Index index : request.indices()) {
IndexMetadata indexMetadata = metadataBuilder.getSafe(index);
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata)
.removeIndexingShards(removeIndexingShards);
metadataBuilder.put(indexMetadataBuilder);
}
}
if (validationErrors.size() > 0) {
ValidationException exception = new ValidationException();
exception.addValidationErrors(validationErrors);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.cluster.routing.allocation.decider;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;

public class RemoveIndexingShardsAllocationDecider extends AllocationDecider {
private static final Logger logger = LogManager.getLogger(RemoveIndexingShardsAllocationDecider.class);

public static final String NAME = "remove_indexing_shards";

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
boolean removeIndexingShardsEnabled = indexMetadata.getSettings().getAsBoolean("index.remove_indexing_shards.enabled", false);

logger.debug(
"[canAllocate] Shard [{}] on node [{}], removeIndexingShards=[{}], searchOnly=[{}], settings={}",
shardRouting.shardId(),
node.nodeId(),
removeIndexingShardsEnabled,
shardRouting.isSearchOnly(),
indexMetadata.getSettings().toString()
);

if (!removeIndexingShardsEnabled) {
return allocation.decision(Decision.YES, NAME, "remove indexing shards is not enabled");
}

// Only allow search replica allocation when remove_indexing_shards is enabled
if (!shardRouting.primary() && shardRouting.isSearchOnly()) {
return allocation.decision(Decision.YES, NAME, "search replicas are allowed");
}

return allocation.decision(Decision.NO, NAME, "remove indexing shards enabled: only search replicas allowed");
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
boolean removeIndexingShardsEnabled = indexMetadata.getSettings().getAsBoolean("index.remove_indexing_shards.enabled", false);

logger.debug(
"[canRemain] Shard [{}] on node [{}], removeIndexingShards=[{}], searchOnly=[{}], settings={}",
shardRouting.shardId(),
node.nodeId(),
removeIndexingShardsEnabled,
shardRouting.isSearchOnly(),
indexMetadata.getSettings().toString()
);

if (!removeIndexingShardsEnabled) {
return allocation.decision(Decision.YES, NAME, "remove indexing shards is not enabled");
}

// When remove_indexing_shards is enabled: To remove primary shards
if (shardRouting.primary()) {
if (hasHealthySearchReplicas(shardRouting, allocation)) {
logger.info("Forcing removal of primary shard [{}] as search replicas are healthy", shardRouting.shardId());
return allocation.decision(Decision.NO, NAME, "removing primary with healthy search replicas");
}
return allocation.decision(Decision.NO, NAME, "primary must be removed when remove_indexing_shards is enabled");
}

// Allow only search replicas to remain
if (shardRouting.isSearchOnly()) {
return allocation.decision(Decision.YES, NAME, "search replicas are allowed");
}

// Remove regular replicas
logger.info("Forcing removal of regular replica shard [{}]", shardRouting.shardId());
return allocation.decision(Decision.NO, NAME, "regular replicas must be removed");
}

@Override
public Decision canRebalance(RoutingAllocation allocation) {
boolean hasRemoveIndexingShardsIndex = allocation.metadata()
.indices()
.values()
.stream()
.anyMatch(idx -> idx.getSettings().getAsBoolean("index.remove_indexing_shards.enabled", false));

if (hasRemoveIndexingShardsIndex) {
logger.info("Allowing rebalancing to facilitate remove indexing shards");
return allocation.decision(Decision.YES, NAME, "allowing rebalance for remove indexing shards");
}

return Decision.ALWAYS;
}

@Override
public Decision canMoveAway(ShardRouting shardRouting, RoutingAllocation allocation) {
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
boolean removeIndexingShardsEnabled = indexMetadata.getSettings().getAsBoolean("index.remove_indexing_shards.enabled", false);

if (removeIndexingShardsEnabled && !shardRouting.isSearchOnly()) {
logger.info("Forcing non-search shard [{}] to move away", shardRouting.shardId());
return allocation.decision(Decision.YES, NAME, "must move away non-search shards");
}

return Decision.ALWAYS;
}

private boolean hasHealthySearchReplicas(ShardRouting shardRouting, RoutingAllocation allocation) {
boolean hasHealthy = allocation.routingTable()
.shardRoutingTable(shardRouting.shardId())
.activeShards()
.stream()
.anyMatch(shard -> !shard.primary() && shard.isSearchOnly() && shard.active());

logger.debug("Search replica check for shard [{}]: hasHealthy=[{}]", shardRouting.shardId(), hasHealthy);
return hasHealthy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
FieldMapper.COERCE_SETTING,
Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING,
MapperService.INDEX_MAPPER_DYNAMIC_SETTING,
IndexSettings.INDEX_REMOVE_INDEXING_SHARDS_SETTING,
MapperService.INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING,
MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING,
MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING,
Expand Down
26 changes: 26 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,15 @@ public static IndexMergePolicy fromString(String text) {
Property.IndexScope
);

public static final String SETTING_REMOVE_INDEXING_SHARDS = "index.remove_indexing_shards.enabled";
public static final Setting<Boolean> INDEX_REMOVE_INDEXING_SHARDS_SETTING = Setting.boolSetting(
SETTING_REMOVE_INDEXING_SHARDS,
false,
Property.Dynamic,
Property.IndexScope
);
private volatile boolean isRemoveIndexingShards;

private final Index index;
private final Version version;
private final Logger logger;
Expand Down Expand Up @@ -1136,6 +1145,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING,
mergeSchedulerConfig::setMaxThreadAndMergeCount
);
scopedSettings.addSettingsUpdateConsumer(INDEX_REMOVE_INDEXING_SHARDS_SETTING, this::setRemoveIndexingShards);
scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.AUTO_THROTTLE_SETTING, mergeSchedulerConfig::setAutoThrottle);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_SYNC_INTERVAL_SETTING, this::setTranslogSyncInterval);
Expand Down Expand Up @@ -1202,6 +1212,22 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
);
}

private void setRemoveIndexingShards(boolean enabled) {
/* The validation can be added at the Settings level
if (enabled) {
if (settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 0) == 0) {
throw new IllegalArgumentException("Cannot scale to zero without search replicas");
}
if (!isRemoteStoreEnabled()) {
throw new IllegalArgumentException("Remote store must be enabled to scale to zero");
}
if (!ReplicationType.SEGMENT.equals(replicationType)) {
throw new IllegalArgumentException("Segment replication must be enabled to scale to zero");
}
}*/
this.isRemoveIndexingShards = enabled;
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
if (this.isRemoteStoreEnabled) {
logger.warn("Search idle is not supported for remote backed indices");
Expand Down
Loading

0 comments on commit bfa69d5

Please sign in to comment.