Skip to content

Commit

Permalink
Skip unnecessary string format in ThrottlingAllocationDecider when no…
Browse files Browse the repository at this point in the history
…t in debug mode (opensearch-project#13750) (opensearch-project#13762)

* Skip unnecessary String format in throttling allocation decider when not in debug mode

Signed-off-by: Rishab Nahata <[email protected]>
Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] authored and kkewwei committed Jul 24, 2024
1 parent dbb8dbd commit 8bfa772
Showing 1 changed file with 49 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;

import java.util.Locale;
import java.util.function.BiFunction;

import static org.opensearch.cluster.routing.allocation.decider.Decision.THROTTLE;
Expand Down Expand Up @@ -211,20 +210,9 @@ private Decision allocateInitialShardCopies(ShardRouting shardRouting, RoutingNo
allocation,
currentInRecoveries,
replicasInitialRecoveries,
(x, y) -> getInitialPrimaryNodeOutgoingRecoveries(x, y),
this::getInitialPrimaryNodeOutgoingRecoveries,
replicasInitialRecoveries,
String.format(
Locale.ROOT,
"[%s=%d]",
CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(),
replicasInitialRecoveries
),
String.format(
Locale.ROOT,
"[%s=%d]",
CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(),
replicasInitialRecoveries
)
true
);
}

Expand All @@ -238,22 +226,9 @@ private Decision allocateNonInitialShardCopies(ShardRouting shardRouting, Routin
allocation,
currentInRecoveries,
concurrentIncomingRecoveries,
(x, y) -> getPrimaryNodeOutgoingRecoveries(x, y),
this::getPrimaryNodeOutgoingRecoveries,
concurrentOutgoingRecoveries,
String.format(
Locale.ROOT,
"[%s=%d] (can also be set via [%s])",
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(),
concurrentIncomingRecoveries,
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey()
),
String.format(
Locale.ROOT,
"[%s=%d] (can also be set via [%s])",
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(),
concurrentOutgoingRecoveries,
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey()
)
false
);
}

Expand All @@ -274,18 +249,30 @@ private Decision allocateShardCopies(
int inRecoveriesLimit,
BiFunction<ShardRouting, RoutingAllocation, Integer> primaryNodeOutRecoveriesFunc,
int outRecoveriesLimit,
String incomingRecoveriesSettingMsg,
String outGoingRecoveriesSettingMsg
boolean isInitialShardCopies
) {
// Allocating a shard to this node will increase the incoming recoveries
if (currentInRecoveries >= inRecoveriesLimit) {
return allocation.decision(
THROTTLE,
NAME,
"reached the limit of incoming shard recoveries [%d], cluster setting %s",
currentInRecoveries,
incomingRecoveriesSettingMsg
);
if (isInitialShardCopies) {
return allocation.decision(
THROTTLE,
NAME,
"reached the limit of incoming shard recoveries [%d], cluster setting [%s=%d]",
currentInRecoveries,
CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(),
inRecoveriesLimit
);
} else {
return allocation.decision(
THROTTLE,
NAME,
"reached the limit of incoming shard recoveries [%d], cluster setting [%s=%d] (can also be set via [%s])",
currentInRecoveries,
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(),
inRecoveriesLimit,
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey()
);
}
} else {
// search for corresponding recovery source (= primary shard) and check number of outgoing recoveries on that node
ShardRouting primaryShard = allocation.routingNodes().activePrimary(shardRouting.shardId());
Expand All @@ -294,14 +281,30 @@ private Decision allocateShardCopies(
}
int primaryNodeOutRecoveries = primaryNodeOutRecoveriesFunc.apply(shardRouting, allocation);
if (primaryNodeOutRecoveries >= outRecoveriesLimit) {
return allocation.decision(
THROTTLE,
NAME,
"reached the limit of outgoing shard recoveries [%d] on the node [%s] which holds the primary, " + "cluster setting %s",
primaryNodeOutRecoveries,
primaryShard.currentNodeId(),
outGoingRecoveriesSettingMsg
);
if (isInitialShardCopies) {
return allocation.decision(
THROTTLE,
NAME,
"reached the limit of outgoing shard recoveries [%d] on the node [%s] which holds the primary, "
+ "cluster setting [%s=%d]",
primaryNodeOutRecoveries,
primaryShard.currentNodeId(),
CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(),
inRecoveriesLimit
);
} else {
return allocation.decision(
THROTTLE,
NAME,
"reached the limit of outgoing shard recoveries [%d] on the node [%s] which holds the primary, "
+ "cluster setting [%s=%d] (can also be set via [%s])",
primaryNodeOutRecoveries,
primaryShard.currentNodeId(),
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(),
outRecoveriesLimit,
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey()
);
}
} else {
return allocation.decision(
YES,
Expand Down

0 comments on commit 8bfa772

Please sign in to comment.