From c3a6aa196e9372153eb1345fb45f609eeb248d02 Mon Sep 17 00:00:00 2001 From: Neetika Singhal Date: Thu, 25 Jul 2024 16:23:43 -0700 Subject: [PATCH] Add canRemain method to TargetPoolAllocationDecider to move shards from local to remote pool for hot to warm tiering Signed-off-by: Neetika Singhal --- .../remotestore/WritableWarmIT.java | 2 +- .../admin/indices/tiering/TieringUtils.java | 51 +++++++++ .../cluster/routing/RoutingPool.java | 4 +- .../allocator/LocalShardsBalancer.java | 10 +- .../decider/TargetPoolAllocationDecider.java | 30 ++++++ .../ShardsTieringAllocationTests.java | 100 ++++++++++++++++++ .../TieringAllocationBaseTestCase.java | 60 +++++++++++ 7 files changed, 252 insertions(+), 5 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUtils.java create mode 100644 server/src/test/java/org/opensearch/cluster/routing/allocation/ShardsTieringAllocationTests.java create mode 100644 server/src/test/java/org/opensearch/cluster/routing/allocation/TieringAllocationBaseTestCase.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java index a51bd6b20fff0..7dc92d45d2366 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java @@ -94,7 +94,7 @@ public void testWritableWarmFeatureFlagDisabled() { public void testWritableWarmBasic() throws Exception { InternalTestCluster internalTestCluster = internalCluster(); internalTestCluster.startClusterManagerOnlyNode(); - internalTestCluster.startDataOnlyNode(); + internalTestCluster.startDataAndSearchNodes(1); Settings settings = Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUtils.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUtils.java new file mode 100644 index 0000000000000..fe62da2b7ceb7 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUtils.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.tiering; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.IndexModule; + +/** + * Utility class for tiering operations + * + * @opensearch.internal + */ +@ExperimentalApi +public class TieringUtils { + + /** + * Checks if the specified shard is a partial shard by + * checking the INDEX_STORE_LOCALITY_SETTING for its index. + * see {@link #isPartialIndex(IndexMetadata)} + * @param shard ShardRouting object representing the shard + * @param allocation RoutingAllocation object representing the allocation + * @return true if the shard is a partial shard, false otherwise + */ + public static boolean isPartialShard(ShardRouting shard, RoutingAllocation allocation) { + IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shard.index()); + return isPartialIndex(indexMetadata); + } + + /** + * Checks if the specified index is a partial index by + * checking the INDEX_STORE_LOCALITY_SETTING for the index. + * + * @param indexMetadata the metadata of the index + * @return true if the index is a partial index, false otherwise + */ + public static boolean isPartialIndex(final IndexMetadata indexMetadata) { + return FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX) + && IndexModule.DataLocalityType.PARTIAL.name() + .equals(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey())); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java index db10ad61c7d6d..590e86d85794a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java @@ -12,6 +12,8 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import static org.opensearch.action.admin.indices.tiering.TieringUtils.isPartialIndex; + /** * {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods * help decide the capabilities of a specific node as well as an index or shard based on the index configuration. @@ -58,6 +60,6 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all * @return {@link RoutingPool} for the given index. */ public static RoutingPool getIndexPool(IndexMetadata indexMetadata) { - return indexMetadata.isRemoteSnapshot() ? REMOTE_CAPABLE : LOCAL_ONLY; + return indexMetadata.isRemoteSnapshot() || isPartialIndex(indexMetadata) ? REMOTE_CAPABLE : LOCAL_ONLY; } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 00eb79add9f1d..c78cb8dcaa37c 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -43,6 +43,7 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; +import static org.opensearch.action.admin.indices.tiering.TieringUtils.isPartialShard; import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING; /** @@ -569,7 +570,8 @@ void moveShards() { ShardRouting shardRouting = it.next(); - if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) { + if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation)) + && !isPartialShard(shardRouting, allocation)) { continue; } @@ -635,7 +637,8 @@ void moveShards() { */ @Override MoveDecision decideMove(final ShardRouting shardRouting) { - if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) { + if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation)) + && !isPartialShard(shardRouting, allocation)) { return MoveDecision.NOT_TAKEN; } @@ -724,7 +727,8 @@ private Map buildModelFromAssigned() for (ShardRouting shard : rn) { assert rn.nodeId().equals(shard.currentNodeId()); /* we skip relocating shards here since we expect an initializing shard with the same id coming in */ - if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)) && shard.state() != RELOCATING) { + if ((RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)) || isPartialShard(shard, allocation)) + && shard.state() != RELOCATING) { node.addShard(shard); ++totalShardCount; if (logger.isTraceEnabled()) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java index 76f9f44077ad8..493d23b57d271 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java @@ -87,6 +87,36 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n return canAllocate(shardRouting, node, allocation); } + @Override + public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + RoutingPool targetPool = RoutingPool.getShardPool(shardRouting, allocation); + RoutingPool currentNodePool = RoutingPool.getNodePool(allocation.routingNodes().node(shardRouting.currentNodeId())); + if (RoutingPool.REMOTE_CAPABLE.equals(targetPool) && targetPool != currentNodePool) { + logger.debug( + "Shard: [{}] has current pool: [{}], target pool: [{}]. Cannot remain on node: [{}]", + shardRouting.shortSummary(), + currentNodePool.name(), + RoutingPool.REMOTE_CAPABLE.name(), + node.node() + ); + return allocation.decision( + Decision.NO, + NAME, + "Shard %s is allocated on a different pool %s than the target pool %s", + shardRouting.shortSummary(), + currentNodePool, + targetPool + ); + } + return allocation.decision( + Decision.YES, + NAME, + "Routing pools are compatible. Shard pool: [%s], node pool: [%s]", + currentNodePool, + targetPool + ); + } + public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) { logger.debug("Evaluating node: {} for autoExpandReplica eligibility of index: {}", node, indexMetadata.getIndex()); return canAllocateInTargetPool(indexMetadata, node, allocation); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/ShardsTieringAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/ShardsTieringAllocationTests.java new file mode 100644 index 0000000000000..54f446b38b61b --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/ShardsTieringAllocationTests.java @@ -0,0 +1,100 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingPool; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.index.IndexModule; + +import static org.opensearch.cluster.routing.RoutingPool.LOCAL_ONLY; +import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE; +import static org.opensearch.cluster.routing.RoutingPool.getIndexPool; +import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING; + +public class ShardsTieringAllocationTests extends TieringAllocationBaseTestCase { + + public void testShardsWithNoTiering() { + int localOnlyNodes = 5; + int remoteCapableNodes = 3; + int localIndices = 5; + int remoteIndices = 0; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + // assign shards to respective nodes + clusterState = allocateShardsAndBalance(clusterState, service); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes); + assertEquals(0, routingNodes.unassigned().size()); + + for (ShardRouting shard : clusterState.getRoutingTable().allShards()) { + assertFalse(shard.unassigned()); + RoutingPool shardPool = RoutingPool.getShardPool(shard, allocation); + assertEquals(LOCAL_ONLY, shardPool); + } + } + + public void testShardsWithTiering() { + int localOnlyNodes = 15; + int remoteCapableNodes = 13; + int localIndices = 10; + int remoteIndices = 0; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + // assign shards to respective nodes + clusterState = allocateShardsAndBalance(clusterState, service); + // put indices in the hot to warm tiering state + clusterState = updateIndexMetadataForTiering( + clusterState, + localIndices, + IndexModule.TieringState.HOT_TO_WARM.name(), + IndexModule.DataLocalityType.PARTIAL.name() + ); + // trigger shard relocation + clusterState = allocateShardsAndBalance(clusterState, service); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes); + assertEquals(0, routingNodes.unassigned().size()); + + for (ShardRouting shard : clusterState.getRoutingTable().allShards()) { + assertFalse(shard.unassigned()); + RoutingNode node = routingNodes.node(shard.currentNodeId()); + RoutingPool nodePool = RoutingPool.getNodePool(node); + RoutingPool shardPool = RoutingPool.getShardPool(shard, allocation); + assertEquals(RoutingPool.REMOTE_CAPABLE, shardPool); + assertEquals(nodePool, shardPool); + } + } + + public void testShardPoolForPartialIndices() { + String index = "test-index"; + IndexMetadata indexMetadata = IndexMetadata.builder(index) + .settings(settings(Version.CURRENT).put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())) + .numberOfShards(PRIMARIES) + .numberOfReplicas(REPLICAS) + .build(); + RoutingPool indexPool = getIndexPool(indexMetadata); + assertEquals(REMOTE_CAPABLE, indexPool); + } + + public void testShardPoolForFullIndices() { + String index = "test-index"; + IndexMetadata indexMetadata = IndexMetadata.builder(index) + .settings(settings(Version.CURRENT).put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.name())) + .numberOfShards(PRIMARIES) + .numberOfReplicas(REPLICAS) + .build(); + RoutingPool indexPool = getIndexPool(indexMetadata); + assertEquals(LOCAL_ONLY, indexPool); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/TieringAllocationBaseTestCase.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/TieringAllocationBaseTestCase.java new file mode 100644 index 0000000000000..5047a23f0dfff --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/TieringAllocationBaseTestCase.java @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING; +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; + +@SuppressForbidden(reason = "feature flag overrides") +public abstract class TieringAllocationBaseTestCase extends RemoteShardsBalancerBaseTestCase { + + @BeforeClass + public static void setup() { + System.setProperty(FeatureFlags.TIERED_REMOTE_INDEX, "true"); + } + + @AfterClass + public static void teardown() { + System.setProperty(FeatureFlags.TIERED_REMOTE_INDEX, "false"); + } + + public ClusterState updateIndexMetadataForTiering( + ClusterState clusterState, + int localIndices, + String tieringState, + String dataLocality + ) { + Metadata.Builder mb = Metadata.builder(clusterState.metadata()); + for (int i = 0; i < localIndices; i++) { + IndexMetadata indexMetadata = clusterState.metadata().index(getIndexName(i, false)); + Settings settings = indexMetadata.getSettings(); + mb.put( + IndexMetadata.builder(indexMetadata) + .settings( + Settings.builder() + .put(settings) + .put(settings) + .put(INDEX_TIERING_STATE.getKey(), tieringState) + .put(INDEX_STORE_LOCALITY_SETTING.getKey(), dataLocality) + ) + ); + } + Metadata metadata = mb.build(); + return ClusterState.builder(clusterState).metadata(metadata).build(); + } +}