Skip to content

Commit

Permalink
[remove datanode] Remove LoadScore sort to fix RegionPriority order (#…
Browse files Browse the repository at this point in the history
…14527) (#14541)

(cherry picked from commit 17729a9)
  • Loading branch information
HxpSerein authored Dec 25, 2024
1 parent ccb3276 commit f99557f
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,6 @@ public double getFreeDiskSpace(int dataNodeId) {
return loadCache.getFreeDiskSpace(dataNodeId);
}

/**
* Get the loadScore of each DataNode.
*
* @return Map<DataNodeId, loadScore>
*/
public Map<Integer, Long> getAllDataNodeLoadScores() {
return loadCache.getAllDataNodeLoadScores();
}

/**
* Get the lowest loadScore DataNode.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,20 +296,17 @@ private synchronized void balanceRegionPriority() {
new TreeMap<>();
try {
Map<TConsensusGroupId, Integer> regionLeaderMap = getLoadManager().getRegionLeaderMap();
Map<Integer, Long> dataNodeLoadScoreMap = getLoadManager().getAllDataNodeLoadScores();

// Balancing region priority in each SchemaRegionGroup
Map<TConsensusGroupId, TRegionReplicaSet> optimalRegionPriorityMap =
priorityRouter.generateOptimalRoutePriority(
getPartitionManager().getAllReplicaSets(TConsensusGroupType.SchemaRegion),
regionLeaderMap,
dataNodeLoadScoreMap);
regionLeaderMap);
// Balancing region priority in each DataRegionGroup
optimalRegionPriorityMap.putAll(
priorityRouter.generateOptimalRoutePriority(
getPartitionManager().getAllReplicaSets(TConsensusGroupType.DataRegion),
regionLeaderMap,
dataNodeLoadScoreMap));
regionLeaderMap));

optimalRegionPriorityMap.forEach(
(regionGroupId, optimalRegionPriority) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,11 @@
package org.apache.iotdb.confignode.manager.load.balancer.router.priority;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;

import org.apache.tsfile.utils.Pair;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.Vector;

/** The GreedyPriorityBalancer always pick the Replica with the lowest loadScore */
public class GreedyPriorityBalancer implements IPriorityBalancer {
Expand All @@ -39,48 +34,15 @@ public GreedyPriorityBalancer() {

@Override
public Map<TConsensusGroupId, TRegionReplicaSet> generateOptimalRoutePriority(
List<TRegionReplicaSet> replicaSets,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Map<Integer, Long> dataNodeLoadScoreMap) {
List<TRegionReplicaSet> replicaSets, Map<TConsensusGroupId, Integer> regionLeaderMap) {

Map<TConsensusGroupId, TRegionReplicaSet> regionPriorityMap = new TreeMap<>();

replicaSets.forEach(
replicaSet -> {
TRegionReplicaSet sortedReplicaSet =
sortReplicasByLoadScore(replicaSet, dataNodeLoadScoreMap);
regionPriorityMap.put(sortedReplicaSet.getRegionId(), sortedReplicaSet);
regionPriorityMap.put(replicaSet.getRegionId(), replicaSet);
});

return regionPriorityMap;
}

protected static TRegionReplicaSet sortReplicasByLoadScore(
TRegionReplicaSet replicaSet, Map<Integer, Long> dataNodeLoadScoreMap) {
TRegionReplicaSet sortedReplicaSet = new TRegionReplicaSet();
sortedReplicaSet.setRegionId(replicaSet.getRegionId());

// List<Pair<loadScore, TDataNodeLocation>> for sorting
List<Pair<Long, TDataNodeLocation>> sortList = new Vector<>();
replicaSet
.getDataNodeLocations()
.forEach(
dataNodeLocation -> {
// The absenteeism of loadScoreMap means ConfigNode-leader doesn't receive any
// heartbeat from that DataNode.
// In this case we put a maximum loadScore into the sortList.
sortList.add(
new Pair<>(
dataNodeLoadScoreMap.computeIfAbsent(
dataNodeLocation.getDataNodeId(), empty -> Long.MAX_VALUE),
dataNodeLocation));
});

sortList.sort(Comparator.comparingLong(Pair::getLeft));
for (Pair<Long, TDataNodeLocation> entry : sortList) {
sortedReplicaSet.addToDataNodeLocations(entry.getRight());
}

return sortedReplicaSet;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@ public interface IPriorityBalancer {
*
* @param replicaSets All RegionGroups
* @param regionLeaderMap The current leader of each RegionGroup
* @param dataNodeLoadScoreMap The current load score of each DataNode
* @return Map<TConsensusGroupId, TRegionReplicaSet>, The optimal route priority for each
* RegionGroup. The replica with higher sorting result have higher priority.
*/
Map<TConsensusGroupId, TRegionReplicaSet> generateOptimalRoutePriority(
List<TRegionReplicaSet> replicaSets,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Map<Integer, Long> dataNodeLoadScoreMap);
List<TRegionReplicaSet> replicaSets, Map<TConsensusGroupId, Integer> regionLeaderMap);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,24 @@ public LeaderPriorityBalancer() {

@Override
public Map<TConsensusGroupId, TRegionReplicaSet> generateOptimalRoutePriority(
List<TRegionReplicaSet> replicaSets,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Map<Integer, Long> dataNodeLoadScoreMap) {

List<TRegionReplicaSet> replicaSets, Map<TConsensusGroupId, Integer> regionLeaderMap) {
Map<TConsensusGroupId, TRegionReplicaSet> regionPriorityMap = new TreeMap<>();

replicaSets.forEach(
replicaSet -> {
/* 1. Sort replicaSet by loadScore */
TRegionReplicaSet sortedReplicaSet =
sortReplicasByLoadScore(replicaSet, dataNodeLoadScoreMap);

/* 2. Pick leader if leader exists and available */
/* 1. Pick leader if leader exists and available */
int leaderId = regionLeaderMap.getOrDefault(replicaSet.getRegionId(), -1);
if (leaderId != -1
&& dataNodeLoadScoreMap.getOrDefault(leaderId, Long.MAX_VALUE) < Long.MAX_VALUE) {
for (int i = 0; i < sortedReplicaSet.getDataNodeLocationsSize(); i++) {
if (sortedReplicaSet.getDataNodeLocations().get(i).getDataNodeId() == leaderId) {
Collections.swap(sortedReplicaSet.getDataNodeLocations(), 0, i);
if (leaderId != -1) {
for (int i = 0; i < replicaSet.getDataNodeLocationsSize(); i++) {
if (replicaSet.getDataNodeLocations().get(i).getDataNodeId() == leaderId) {
Collections.swap(replicaSet.getDataNodeLocations(), 0, i);
break;
}
}
}

regionPriorityMap.put(sortedReplicaSet.getRegionId(), sortedReplicaSet);
regionPriorityMap.put(replicaSet.getRegionId(), replicaSet);
});

return regionPriorityMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,22 +553,6 @@ public double getFreeDiskSpace(int dataNodeId) {
.orElse(0d);
}

/**
* Get the loadScore of each DataNode.
*
* @return Map<DataNodeId, loadScore>
*/
public Map<Integer, Long> getAllDataNodeLoadScores() {
Map<Integer, Long> result = new ConcurrentHashMap<>();
nodeCacheMap.forEach(
(dataNodeId, heartbeatCache) -> {
if (heartbeatCache instanceof DataNodeHeartbeatCache) {
result.put(dataNodeId, heartbeatCache.getLoadScore());
}
});
return result;
}

/**
* Get the lowest loadScore DataNode.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class GreedyPriorityTest {

Expand Down Expand Up @@ -68,17 +67,11 @@ public void testGenLoadScoreGreedyRoutingPolicy() {
}
nodeCacheMap.values().forEach(baseNodeCache -> baseNodeCache.updateCurrentStatistics(false));

/* Get the loadScoreMap */
Map<Integer, Long> loadScoreMap = new ConcurrentHashMap<>();
nodeCacheMap.forEach(
(dataNodeId, heartbeatCache) ->
loadScoreMap.put(dataNodeId, heartbeatCache.getLoadScore()));

/* Build TRegionReplicaSet */
TConsensusGroupId groupId1 = new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1);
TRegionReplicaSet regionReplicaSet1 =
new TRegionReplicaSet(
groupId1, Arrays.asList(dataNodeLocations.get(1), dataNodeLocations.get(0)));
groupId1, Arrays.asList(dataNodeLocations.get(0), dataNodeLocations.get(1)));
TConsensusGroupId groupId2 = new TConsensusGroupId(TConsensusGroupType.DataRegion, 2);
TRegionReplicaSet regionReplicaSet2 =
new TRegionReplicaSet(
Expand All @@ -88,7 +81,7 @@ public void testGenLoadScoreGreedyRoutingPolicy() {
Map<TConsensusGroupId, TRegionReplicaSet> result =
new GreedyPriorityBalancer()
.generateOptimalRoutePriority(
Arrays.asList(regionReplicaSet1, regionReplicaSet2), new HashMap<>(), loadScoreMap);
Arrays.asList(regionReplicaSet1, regionReplicaSet2), new HashMap<>());
Assert.assertEquals(2, result.size());

TRegionReplicaSet result1 = result.get(groupId1);
Expand All @@ -97,7 +90,7 @@ public void testGenLoadScoreGreedyRoutingPolicy() {
}

TRegionReplicaSet result2 = result.get(groupId2);
for (int i = 3; i < 4; i++) {
for (int i = 2; i < 4; i++) {
Assert.assertEquals(dataNodeLocations.get(i), result2.getDataNodeLocations().get(i - 2));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class LeaderPriorityBalancerTest {

Expand Down Expand Up @@ -69,25 +67,19 @@ public void testGenRealTimeRoutingPolicy() {
}
nodeCacheMap.values().forEach(baseNodeCache -> baseNodeCache.updateCurrentStatistics(false));

// Get the loadScoreMap
Map<Integer, Long> loadScoreMap = new ConcurrentHashMap<>();
nodeCacheMap.forEach(
(dataNodeId, heartbeatCache) ->
loadScoreMap.put(dataNodeId, heartbeatCache.getLoadScore()));

// Build TRegionReplicaSet
TConsensusGroupId groupId1 = new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1);
TRegionReplicaSet regionReplicaSet1 =
new TRegionReplicaSet(
groupId1,
Arrays.asList(
dataNodeLocations.get(2), dataNodeLocations.get(1), dataNodeLocations.get(0)));
dataNodeLocations.get(0), dataNodeLocations.get(1), dataNodeLocations.get(2)));
TConsensusGroupId groupId2 = new TConsensusGroupId(TConsensusGroupType.DataRegion, 2);
TRegionReplicaSet regionReplicaSet2 =
new TRegionReplicaSet(
groupId2,
Arrays.asList(
dataNodeLocations.get(5), dataNodeLocations.get(4), dataNodeLocations.get(3)));
dataNodeLocations.get(3), dataNodeLocations.get(4), dataNodeLocations.get(5)));
List<TRegionReplicaSet> regionReplicaSets = Arrays.asList(regionReplicaSet1, regionReplicaSet2);

// Build leaderMap
Expand All @@ -97,67 +89,16 @@ public void testGenRealTimeRoutingPolicy() {

// Check result
Map<TConsensusGroupId, TRegionReplicaSet> result =
new LeaderPriorityBalancer()
.generateOptimalRoutePriority(regionReplicaSets, leaderMap, loadScoreMap);
new LeaderPriorityBalancer().generateOptimalRoutePriority(regionReplicaSets, leaderMap);
TRegionReplicaSet result1 = result.get(groupId1);
// Leader first
Assert.assertEquals(dataNodeLocations.get(1), result1.getDataNodeLocations().get(0));
// The others will be sorted by loadScore
Assert.assertEquals(dataNodeLocations.get(0), result1.getDataNodeLocations().get(1));
Assert.assertEquals(dataNodeLocations.get(2), result1.getDataNodeLocations().get(2));
TRegionReplicaSet result2 = result.get(groupId2);
// Leader first
Assert.assertEquals(dataNodeLocations.get(4), result2.getDataNodeLocations().get(0));
// The others will be sorted by loadScore
Assert.assertEquals(dataNodeLocations.get(3), result2.getDataNodeLocations().get(1));
Assert.assertEquals(dataNodeLocations.get(5), result2.getDataNodeLocations().get(2));
}

@Test
public void testLeaderUnavailable() {
// Build TDataNodeLocations
List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
for (int i = 0; i < 3; i++) {
dataNodeLocations.add(
new TDataNodeLocation(
i,
new TEndPoint("0.0.0.0", 6667 + i),
new TEndPoint("0.0.0.0", 10730 + i),
new TEndPoint("0.0.0.0", 10740 + i),
new TEndPoint("0.0.0.0", 10760 + i),
new TEndPoint("0.0.0.0", 10750 + i)));
}

// Build TRegionReplicaSet
TConsensusGroupId groupId1 = new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1);
TRegionReplicaSet regionReplicaSet1 =
new TRegionReplicaSet(
groupId1,
Arrays.asList(
dataNodeLocations.get(2), dataNodeLocations.get(1), dataNodeLocations.get(0)));

// Build leaderMap
Map<TConsensusGroupId, Integer> leaderMap = new HashMap<>();
leaderMap.put(groupId1, 1);

// Build loadScoreMap
Map<Integer, Long> loadScoreMap = new ConcurrentHashMap<>();
loadScoreMap.put(0, 10L);
loadScoreMap.put(2, 20L);
// The leader is DataNode-1, but it's unavailable
loadScoreMap.put(1, Long.MAX_VALUE);

// Check result
Map<TConsensusGroupId, TRegionReplicaSet> result =
new LeaderPriorityBalancer()
.generateOptimalRoutePriority(
Collections.singletonList(regionReplicaSet1), leaderMap, loadScoreMap);
// Only sorted by loadScore since the leader is unavailable
Assert.assertEquals(
dataNodeLocations.get(0), result.get(groupId1).getDataNodeLocations().get(0));
Assert.assertEquals(
dataNodeLocations.get(2), result.get(groupId1).getDataNodeLocations().get(1));
Assert.assertEquals(
dataNodeLocations.get(1), result.get(groupId1).getDataNodeLocations().get(2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,11 @@ public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
if (impl.isReadOnly()) {
return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY);
} else if (!impl.isActive()) {
return RpcUtils.getStatus(
TSStatusCode.WRITE_PROCESS_REJECT,
"peer is inactive and not ready to receive sync log request.");
String message =
String.format(
"Peer is inactive and not ready to write request, %s, DataNode Id: %s",
groupId.toString(), impl.getThisNode().getNodeId());
return RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, message);
} else {
return impl.write(request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,12 @@ public TSyncLogEntriesRes syncLogEntries(TSyncLogEntriesReq req) {
return new TSyncLogEntriesRes(Collections.singletonList(status));
}
if (!impl.isActive()) {
String message =
String.format(
"Peer is inactive and not ready to receive sync log request, %s, DataNode Id: %s",
groupId, impl.getThisNode().getNodeId());
TSStatus status = new TSStatus(TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode());
status.setMessage("peer is inactive and not ready to receive sync log request");
status.setMessage(message);
return new TSyncLogEntriesRes(Collections.singletonList(status));
}
BatchIndexedConsensusRequest logEntriesInThisBatch =
Expand Down

0 comments on commit f99557f

Please sign in to comment.