Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[To dev/1.3][remove datanode] Remove LoadScore sort to fix RegionPriority order #14541

Merged
merged 1 commit into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,6 @@ public double getFreeDiskSpace(int dataNodeId) {
return loadCache.getFreeDiskSpace(dataNodeId);
}

/**
* Get the loadScore of each DataNode.
*
* @return Map<DataNodeId, loadScore>
*/
public Map<Integer, Long> getAllDataNodeLoadScores() {
return loadCache.getAllDataNodeLoadScores();
}

/**
* Get the lowest loadScore DataNode.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,20 +296,17 @@ private synchronized void balanceRegionPriority() {
new TreeMap<>();
try {
Map<TConsensusGroupId, Integer> regionLeaderMap = getLoadManager().getRegionLeaderMap();
Map<Integer, Long> dataNodeLoadScoreMap = getLoadManager().getAllDataNodeLoadScores();

// Balancing region priority in each SchemaRegionGroup
Map<TConsensusGroupId, TRegionReplicaSet> optimalRegionPriorityMap =
priorityRouter.generateOptimalRoutePriority(
getPartitionManager().getAllReplicaSets(TConsensusGroupType.SchemaRegion),
regionLeaderMap,
dataNodeLoadScoreMap);
regionLeaderMap);
// Balancing region priority in each DataRegionGroup
optimalRegionPriorityMap.putAll(
priorityRouter.generateOptimalRoutePriority(
getPartitionManager().getAllReplicaSets(TConsensusGroupType.DataRegion),
regionLeaderMap,
dataNodeLoadScoreMap));
regionLeaderMap));

optimalRegionPriorityMap.forEach(
(regionGroupId, optimalRegionPriority) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,11 @@
package org.apache.iotdb.confignode.manager.load.balancer.router.priority;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;

import org.apache.tsfile.utils.Pair;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.Vector;

/** The GreedyPriorityBalancer always pick the Replica with the lowest loadScore */
public class GreedyPriorityBalancer implements IPriorityBalancer {
Expand All @@ -39,48 +34,15 @@ public GreedyPriorityBalancer() {

@Override
public Map<TConsensusGroupId, TRegionReplicaSet> generateOptimalRoutePriority(
List<TRegionReplicaSet> replicaSets,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Map<Integer, Long> dataNodeLoadScoreMap) {
List<TRegionReplicaSet> replicaSets, Map<TConsensusGroupId, Integer> regionLeaderMap) {

Map<TConsensusGroupId, TRegionReplicaSet> regionPriorityMap = new TreeMap<>();

replicaSets.forEach(
replicaSet -> {
TRegionReplicaSet sortedReplicaSet =
sortReplicasByLoadScore(replicaSet, dataNodeLoadScoreMap);
regionPriorityMap.put(sortedReplicaSet.getRegionId(), sortedReplicaSet);
regionPriorityMap.put(replicaSet.getRegionId(), replicaSet);
});

return regionPriorityMap;
}

protected static TRegionReplicaSet sortReplicasByLoadScore(
TRegionReplicaSet replicaSet, Map<Integer, Long> dataNodeLoadScoreMap) {
TRegionReplicaSet sortedReplicaSet = new TRegionReplicaSet();
sortedReplicaSet.setRegionId(replicaSet.getRegionId());

// List<Pair<loadScore, TDataNodeLocation>> for sorting
List<Pair<Long, TDataNodeLocation>> sortList = new Vector<>();
replicaSet
.getDataNodeLocations()
.forEach(
dataNodeLocation -> {
// The absenteeism of loadScoreMap means ConfigNode-leader doesn't receive any
// heartbeat from that DataNode.
// In this case we put a maximum loadScore into the sortList.
sortList.add(
new Pair<>(
dataNodeLoadScoreMap.computeIfAbsent(
dataNodeLocation.getDataNodeId(), empty -> Long.MAX_VALUE),
dataNodeLocation));
});

sortList.sort(Comparator.comparingLong(Pair::getLeft));
for (Pair<Long, TDataNodeLocation> entry : sortList) {
sortedReplicaSet.addToDataNodeLocations(entry.getRight());
}

return sortedReplicaSet;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@ public interface IPriorityBalancer {
*
* @param replicaSets All RegionGroups
* @param regionLeaderMap The current leader of each RegionGroup
* @param dataNodeLoadScoreMap The current load score of each DataNode
* @return Map<TConsensusGroupId, TRegionReplicaSet>, The optimal route priority for each
* RegionGroup. The replica with higher sorting result have higher priority.
*/
Map<TConsensusGroupId, TRegionReplicaSet> generateOptimalRoutePriority(
List<TRegionReplicaSet> replicaSets,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Map<Integer, Long> dataNodeLoadScoreMap);
List<TRegionReplicaSet> replicaSets, Map<TConsensusGroupId, Integer> regionLeaderMap);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,24 @@ public LeaderPriorityBalancer() {

@Override
public Map<TConsensusGroupId, TRegionReplicaSet> generateOptimalRoutePriority(
List<TRegionReplicaSet> replicaSets,
Map<TConsensusGroupId, Integer> regionLeaderMap,
Map<Integer, Long> dataNodeLoadScoreMap) {

List<TRegionReplicaSet> replicaSets, Map<TConsensusGroupId, Integer> regionLeaderMap) {
Map<TConsensusGroupId, TRegionReplicaSet> regionPriorityMap = new TreeMap<>();

replicaSets.forEach(
replicaSet -> {
/* 1. Sort replicaSet by loadScore */
TRegionReplicaSet sortedReplicaSet =
sortReplicasByLoadScore(replicaSet, dataNodeLoadScoreMap);

/* 2. Pick leader if leader exists and available */
/* 1. Pick leader if leader exists and available */
int leaderId = regionLeaderMap.getOrDefault(replicaSet.getRegionId(), -1);
if (leaderId != -1
&& dataNodeLoadScoreMap.getOrDefault(leaderId, Long.MAX_VALUE) < Long.MAX_VALUE) {
for (int i = 0; i < sortedReplicaSet.getDataNodeLocationsSize(); i++) {
if (sortedReplicaSet.getDataNodeLocations().get(i).getDataNodeId() == leaderId) {
Collections.swap(sortedReplicaSet.getDataNodeLocations(), 0, i);
if (leaderId != -1) {
for (int i = 0; i < replicaSet.getDataNodeLocationsSize(); i++) {
if (replicaSet.getDataNodeLocations().get(i).getDataNodeId() == leaderId) {
Collections.swap(replicaSet.getDataNodeLocations(), 0, i);
break;
}
}
}

regionPriorityMap.put(sortedReplicaSet.getRegionId(), sortedReplicaSet);
regionPriorityMap.put(replicaSet.getRegionId(), replicaSet);
});

return regionPriorityMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,22 +553,6 @@ public double getFreeDiskSpace(int dataNodeId) {
.orElse(0d);
}

/**
* Get the loadScore of each DataNode.
*
* @return Map<DataNodeId, loadScore>
*/
public Map<Integer, Long> getAllDataNodeLoadScores() {
Map<Integer, Long> result = new ConcurrentHashMap<>();
nodeCacheMap.forEach(
(dataNodeId, heartbeatCache) -> {
if (heartbeatCache instanceof DataNodeHeartbeatCache) {
result.put(dataNodeId, heartbeatCache.getLoadScore());
}
});
return result;
}

/**
* Get the lowest loadScore DataNode.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class GreedyPriorityTest {

Expand Down Expand Up @@ -68,17 +67,11 @@ public void testGenLoadScoreGreedyRoutingPolicy() {
}
nodeCacheMap.values().forEach(baseNodeCache -> baseNodeCache.updateCurrentStatistics(false));

/* Get the loadScoreMap */
Map<Integer, Long> loadScoreMap = new ConcurrentHashMap<>();
nodeCacheMap.forEach(
(dataNodeId, heartbeatCache) ->
loadScoreMap.put(dataNodeId, heartbeatCache.getLoadScore()));

/* Build TRegionReplicaSet */
TConsensusGroupId groupId1 = new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1);
TRegionReplicaSet regionReplicaSet1 =
new TRegionReplicaSet(
groupId1, Arrays.asList(dataNodeLocations.get(1), dataNodeLocations.get(0)));
groupId1, Arrays.asList(dataNodeLocations.get(0), dataNodeLocations.get(1)));
TConsensusGroupId groupId2 = new TConsensusGroupId(TConsensusGroupType.DataRegion, 2);
TRegionReplicaSet regionReplicaSet2 =
new TRegionReplicaSet(
Expand All @@ -88,7 +81,7 @@ public void testGenLoadScoreGreedyRoutingPolicy() {
Map<TConsensusGroupId, TRegionReplicaSet> result =
new GreedyPriorityBalancer()
.generateOptimalRoutePriority(
Arrays.asList(regionReplicaSet1, regionReplicaSet2), new HashMap<>(), loadScoreMap);
Arrays.asList(regionReplicaSet1, regionReplicaSet2), new HashMap<>());
Assert.assertEquals(2, result.size());

TRegionReplicaSet result1 = result.get(groupId1);
Expand All @@ -97,7 +90,7 @@ public void testGenLoadScoreGreedyRoutingPolicy() {
}

TRegionReplicaSet result2 = result.get(groupId2);
for (int i = 3; i < 4; i++) {
for (int i = 2; i < 4; i++) {
Assert.assertEquals(dataNodeLocations.get(i), result2.getDataNodeLocations().get(i - 2));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class LeaderPriorityBalancerTest {

Expand Down Expand Up @@ -69,25 +67,19 @@ public void testGenRealTimeRoutingPolicy() {
}
nodeCacheMap.values().forEach(baseNodeCache -> baseNodeCache.updateCurrentStatistics(false));

// Get the loadScoreMap
Map<Integer, Long> loadScoreMap = new ConcurrentHashMap<>();
nodeCacheMap.forEach(
(dataNodeId, heartbeatCache) ->
loadScoreMap.put(dataNodeId, heartbeatCache.getLoadScore()));

// Build TRegionReplicaSet
TConsensusGroupId groupId1 = new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1);
TRegionReplicaSet regionReplicaSet1 =
new TRegionReplicaSet(
groupId1,
Arrays.asList(
dataNodeLocations.get(2), dataNodeLocations.get(1), dataNodeLocations.get(0)));
dataNodeLocations.get(0), dataNodeLocations.get(1), dataNodeLocations.get(2)));
TConsensusGroupId groupId2 = new TConsensusGroupId(TConsensusGroupType.DataRegion, 2);
TRegionReplicaSet regionReplicaSet2 =
new TRegionReplicaSet(
groupId2,
Arrays.asList(
dataNodeLocations.get(5), dataNodeLocations.get(4), dataNodeLocations.get(3)));
dataNodeLocations.get(3), dataNodeLocations.get(4), dataNodeLocations.get(5)));
List<TRegionReplicaSet> regionReplicaSets = Arrays.asList(regionReplicaSet1, regionReplicaSet2);

// Build leaderMap
Expand All @@ -97,67 +89,16 @@ public void testGenRealTimeRoutingPolicy() {

// Check result
Map<TConsensusGroupId, TRegionReplicaSet> result =
new LeaderPriorityBalancer()
.generateOptimalRoutePriority(regionReplicaSets, leaderMap, loadScoreMap);
new LeaderPriorityBalancer().generateOptimalRoutePriority(regionReplicaSets, leaderMap);
TRegionReplicaSet result1 = result.get(groupId1);
// Leader first
Assert.assertEquals(dataNodeLocations.get(1), result1.getDataNodeLocations().get(0));
// The others will be sorted by loadScore
Assert.assertEquals(dataNodeLocations.get(0), result1.getDataNodeLocations().get(1));
Assert.assertEquals(dataNodeLocations.get(2), result1.getDataNodeLocations().get(2));
TRegionReplicaSet result2 = result.get(groupId2);
// Leader first
Assert.assertEquals(dataNodeLocations.get(4), result2.getDataNodeLocations().get(0));
// The others will be sorted by loadScore
Assert.assertEquals(dataNodeLocations.get(3), result2.getDataNodeLocations().get(1));
Assert.assertEquals(dataNodeLocations.get(5), result2.getDataNodeLocations().get(2));
}

@Test
public void testLeaderUnavailable() {
// Build TDataNodeLocations
List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
for (int i = 0; i < 3; i++) {
dataNodeLocations.add(
new TDataNodeLocation(
i,
new TEndPoint("0.0.0.0", 6667 + i),
new TEndPoint("0.0.0.0", 10730 + i),
new TEndPoint("0.0.0.0", 10740 + i),
new TEndPoint("0.0.0.0", 10760 + i),
new TEndPoint("0.0.0.0", 10750 + i)));
}

// Build TRegionReplicaSet
TConsensusGroupId groupId1 = new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1);
TRegionReplicaSet regionReplicaSet1 =
new TRegionReplicaSet(
groupId1,
Arrays.asList(
dataNodeLocations.get(2), dataNodeLocations.get(1), dataNodeLocations.get(0)));

// Build leaderMap
Map<TConsensusGroupId, Integer> leaderMap = new HashMap<>();
leaderMap.put(groupId1, 1);

// Build loadScoreMap
Map<Integer, Long> loadScoreMap = new ConcurrentHashMap<>();
loadScoreMap.put(0, 10L);
loadScoreMap.put(2, 20L);
// The leader is DataNode-1, but it's unavailable
loadScoreMap.put(1, Long.MAX_VALUE);

// Check result
Map<TConsensusGroupId, TRegionReplicaSet> result =
new LeaderPriorityBalancer()
.generateOptimalRoutePriority(
Collections.singletonList(regionReplicaSet1), leaderMap, loadScoreMap);
// Only sorted by loadScore since the leader is unavailable
Assert.assertEquals(
dataNodeLocations.get(0), result.get(groupId1).getDataNodeLocations().get(0));
Assert.assertEquals(
dataNodeLocations.get(2), result.get(groupId1).getDataNodeLocations().get(1));
Assert.assertEquals(
dataNodeLocations.get(1), result.get(groupId1).getDataNodeLocations().get(2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,11 @@ public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
if (impl.isReadOnly()) {
return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY);
} else if (!impl.isActive()) {
return RpcUtils.getStatus(
TSStatusCode.WRITE_PROCESS_REJECT,
"peer is inactive and not ready to receive sync log request.");
String message =
String.format(
"Peer is inactive and not ready to write request, %s, DataNode Id: %s",
groupId.toString(), impl.getThisNode().getNodeId());
return RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, message);
} else {
return impl.write(request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,12 @@ public TSyncLogEntriesRes syncLogEntries(TSyncLogEntriesReq req) {
return new TSyncLogEntriesRes(Collections.singletonList(status));
}
if (!impl.isActive()) {
String message =
String.format(
"Peer is inactive and not ready to receive sync log request, %s, DataNode Id: %s",
groupId, impl.getThisNode().getNodeId());
TSStatus status = new TSStatus(TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode());
status.setMessage("peer is inactive and not ready to receive sync log request");
status.setMessage(message);
return new TSyncLogEntriesRes(Collections.singletonList(status));
}
BatchIndexedConsensusRequest logEntriesInThisBatch =
Expand Down
Loading