Skip to content

Commit

Permalink
[region migration] Remove retry of RegionMigrateService::addPeer (#14362
Browse files Browse the repository at this point in the history
)

* done

* synchronized & snapshot name

* remove test

* for removePeer and resetPeerList
  • Loading branch information
liyuheng55555 authored Dec 12, 2024
1 parent 88ffa48 commit 4be9909
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,53 +296,55 @@ public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusE
IoTConsensusServerImpl impl =
Optional.ofNullable(stateMachineMap.get(groupId))
.orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
if (impl.getConfiguration().contains(peer)) {
throw new PeerAlreadyInConsensusGroupException(groupId, peer);
}
try {
// step 1: inactive new Peer to prepare for following steps
logger.info("[IoTConsensus] inactivate new peer: {}", peer);
impl.inactivePeer(peer, false);
synchronized (impl) {
if (impl.getConfiguration().contains(peer)) {
throw new PeerAlreadyInConsensusGroupException(groupId, peer);
}
try {
// step 1: inactive new Peer to prepare for following steps
logger.info("[IoTConsensus] inactivate new peer: {}", peer);
impl.inactivePeer(peer, false);

// step 2: notify all the other Peers to build the sync connection to newPeer
logger.info("[IoTConsensus] notify current peers to build sync log...");
impl.notifyPeersToBuildSyncLogChannel(peer);
// step 2: notify all the other Peers to build the sync connection to newPeer
logger.info("[IoTConsensus] notify current peers to build sync log...");
impl.notifyPeersToBuildSyncLogChannel(peer);

// step 3: take snapshot
logger.info("[IoTConsensus] start to take snapshot...");
// step 3: take snapshot
logger.info("[IoTConsensus] start to take snapshot...");

impl.takeSnapshot();
impl.takeSnapshot();

// step 4: transit snapshot
logger.info("[IoTConsensus] start to transmit snapshot...");
impl.transmitSnapshot(peer);
// step 4: transit snapshot
logger.info("[IoTConsensus] start to transmit snapshot...");
impl.transmitSnapshot(peer);

// step 5: let the new peer load snapshot
logger.info("[IoTConsensus] trigger new peer to load snapshot...");
impl.triggerSnapshotLoad(peer);
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);
// step 5: let the new peer load snapshot
logger.info("[IoTConsensus] trigger new peer to load snapshot...");
impl.triggerSnapshotLoad(peer);
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);

// step 6: active new Peer
logger.info("[IoTConsensus] activate new peer...");
impl.activePeer(peer);
// step 6: active new Peer
logger.info("[IoTConsensus] activate new peer...");
impl.activePeer(peer);

// step 7: notify remote peer to clean up transferred snapshot
logger.info("[IoTConsensus] clean up remote snapshot...");
try {
impl.cleanupRemoteSnapshot(peer);
} catch (ConsensusGroupModifyPeerException e) {
logger.warn("[IoTConsensus] failed to cleanup remote snapshot", e);
}
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);

// step 7: notify remote peer to clean up transferred snapshot
logger.info("[IoTConsensus] clean up remote snapshot...");
try {
impl.cleanupRemoteSnapshot(peer);
} catch (ConsensusGroupModifyPeerException e) {
logger.warn("[IoTConsensus] failed to cleanup remote snapshot", e);
logger.info("[IoTConsensus] add remote peer failed, automatic cleanup side effects...");
// try to clean up the sync log channel
impl.notifyPeersToRemoveSyncLogChannel(peer);
throw new ConsensusException(e);
} finally {
logger.info("[IoTConsensus] clean up local snapshot...");
impl.cleanupLocalSnapshot();
}
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);

} catch (ConsensusGroupModifyPeerException e) {
logger.info("[IoTConsensus] add remote peer failed, automatic cleanup side effects...");
// try to clean up the sync log channel
impl.notifyPeersToRemoveSyncLogChannel(peer);
throw new ConsensusException(e);
} finally {
logger.info("[IoTConsensus] clean up local snapshot...");
impl.cleanupLocalSnapshot();
}
}

Expand All @@ -352,29 +354,32 @@ public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws Consens
Optional.ofNullable(stateMachineMap.get(groupId))
.orElseThrow(() -> new ConsensusGroupNotExistException(groupId));

if (!impl.getConfiguration().contains(peer)) {
throw new PeerNotInConsensusGroupException(groupId, peer.toString());
}
synchronized (impl) {
if (!impl.getConfiguration().contains(peer)) {
throw new PeerNotInConsensusGroupException(groupId, peer.toString());
}

KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.INIT);
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.INIT);

// let other peers remove the sync channel with target peer
impl.notifyPeersToRemoveSyncLogChannel(peer);
KillPoint.setKillPoint(
IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_NOTIFY_PEERS_TO_REMOVE_REPLICATE_CHANNEL);
// let other peers remove the sync channel with target peer
impl.notifyPeersToRemoveSyncLogChannel(peer);
KillPoint.setKillPoint(
IoTConsensusRemovePeerCoordinatorKillPoints
.AFTER_NOTIFY_PEERS_TO_REMOVE_REPLICATE_CHANNEL);

try {
// let target peer reject new write
impl.inactivePeer(peer, true);
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_INACTIVE_PEER);
// wait its SyncLog to complete
impl.waitTargetPeerUntilSyncLogCompleted(peer);
// wait its region related resource to release
impl.waitReleaseAllRegionRelatedResource(peer);
} catch (ConsensusGroupModifyPeerException e) {
throw new ConsensusException(e.getMessage());
try {
// let target peer reject new write
impl.inactivePeer(peer, true);
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_INACTIVE_PEER);
// wait its SyncLog to complete
impl.waitTargetPeerUntilSyncLogCompleted(peer);
// wait its region related resource to release
impl.waitReleaseAllRegionRelatedResource(peer);
} catch (ConsensusGroupModifyPeerException e) {
throw new ConsensusException(e.getMessage());
}
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH);
}
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH);
}

@Override
Expand Down Expand Up @@ -485,6 +490,7 @@ public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
IoTConsensusServerImpl impl =
Optional.ofNullable(stateMachineMap.get(groupId))
.orElseThrow(() -> new ConsensusGroupNotExistException(groupId));

Peer localPeer = new Peer(groupId, thisNodeId, thisNode);
if (!correctPeers.contains(localPeer)) {
logger.info(
Expand All @@ -493,25 +499,28 @@ public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
deleteLocalPeer(groupId);
return;
}
ImmutableList<Peer> currentMembers = ImmutableList.copyOf(impl.getConfiguration());
String previousPeerListStr = currentMembers.toString();
for (Peer peer : currentMembers) {
if (!correctPeers.contains(peer)) {
if (!impl.removeSyncLogChannel(peer)) {
logger.error(
"[RESET PEER LIST] Failed to remove peer {}'s sync log channel from group {}",
peer,
groupId);

synchronized (impl) {
ImmutableList<Peer> currentMembers = ImmutableList.copyOf(impl.getConfiguration());
String previousPeerListStr = currentMembers.toString();
for (Peer peer : currentMembers) {
if (!correctPeers.contains(peer)) {
if (!impl.removeSyncLogChannel(peer)) {
logger.error(
"[RESET PEER LIST] Failed to remove peer {}'s sync log channel from group {}",
peer,
groupId);
}
}
}
}
logger.info(
"[RESET PEER LIST] Local peer list has been reset: {} -> {}",
previousPeerListStr,
impl.getConfiguration());
for (Peer peer : correctPeers) {
if (!impl.getConfiguration().contains(peer)) {
logger.warn("[RESET PEER LIST] \"Correct peer\" {} is not in local peer list", peer);
logger.info(
"[RESET PEER LIST] Local peer list has been reset: {} -> {}",
previousPeerListStr,
impl.getConfiguration());
for (Peer peer : correctPeers) {
if (!impl.getConfiguration().contains(peer)) {
logger.warn("[RESET PEER LIST] \"Correct peer\" {} is not in local peer list", peer);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -276,10 +277,9 @@ public DataSet read(IConsensusRequest request) {

public void takeSnapshot() throws ConsensusGroupModifyPeerException {
try {
long newSnapshotIndex = getLatestSnapshotIndex() + 1;
newSnapshotDirName =
String.format(
"%s_%s_%d", SNAPSHOT_DIR_NAME, thisNode.getGroupId().getId(), newSnapshotIndex);
"%s_%s_%s", SNAPSHOT_DIR_NAME, thisNode.getGroupId().getId(), UUID.randomUUID());
File snapshotDir = new File(storageDir, newSnapshotDirName);
if (snapshotDir.exists()) {
FileUtils.deleteDirectory(snapshotDir);
Expand Down Expand Up @@ -400,22 +400,6 @@ private String calculateSnapshotPath(String snapshotId, String originalFilePath)
return originalFilePath.substring(originalFilePath.indexOf(snapshotId));
}

private long getLatestSnapshotIndex() {
long snapShotIndex = 0;
File directory = new File(storageDir);
File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(SNAPSHOT_DIR_NAME));
if (versionFiles == null || versionFiles.length == 0) {
return snapShotIndex;
}
for (File file : versionFiles) {
snapShotIndex =
Math.max(
snapShotIndex,
Long.parseLong(SNAPSHOT_INDEX_PATTEN.matcher(file.getName()).replaceAll("")));
}
return snapShotIndex;
}

private void clearOldSnapshot() {
File directory = new File(storageDir);
File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(SNAPSHOT_DIR_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
Expand Down Expand Up @@ -95,7 +94,6 @@ public void allTest() throws Exception {
peerTest();
transferLeader();
snapshotTest();
snapshotUpgradeTest();
}

public void addConsensusGroup() {
Expand Down Expand Up @@ -212,38 +210,4 @@ public void snapshotTest() throws ConsensusException {
Assert.assertNotEquals(versionFiles1[0].getName(), versionFiles2[0].getName());
consensusImpl.deleteLocalPeer(dataRegionId);
}

public void snapshotUpgradeTest() throws Exception {
consensusImpl.createLocalPeer(
dataRegionId,
Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort))));
consensusImpl.triggerSnapshot(dataRegionId, false);
long oldSnapshotIndex = System.currentTimeMillis();
String oldSnapshotDirName =
String.format(
"%s_%s_%d",
IoTConsensusServerImpl.SNAPSHOT_DIR_NAME, dataRegionId.getId(), oldSnapshotIndex);
File regionDir = new File(storageDir, "1_1");
File oldSnapshotDir = new File(regionDir, oldSnapshotDirName);
if (oldSnapshotDir.exists()) {
FileUtils.deleteFully(oldSnapshotDir);
}
if (!oldSnapshotDir.mkdirs()) {
throw new ConsensusGroupModifyPeerException(
String.format("%s: cannot mkdir for snapshot", dataRegionId));
}
consensusImpl.triggerSnapshot(dataRegionId, false);
Assert.assertFalse(oldSnapshotDir.exists());

File dataDir = new File(IoTConsensus.buildPeerDir(storageDir, dataRegionId));

File[] snapshotFiles =
dataDir.listFiles((dir, name) -> name.startsWith(IoTConsensusServerImpl.SNAPSHOT_DIR_NAME));
Assert.assertNotNull(snapshotFiles);
Assert.assertEquals(1, snapshotFiles.length);
Assert.assertEquals(
oldSnapshotIndex + 1,
Long.parseLong(snapshotFiles[0].getName().replaceAll(".*[^\\d](?=(\\d+))", "")));
consensusImpl.deleteLocalPeer(dataRegionId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,42 +256,29 @@ private TSStatus addPeer() {
TEndPoint destEndpoint = getConsensusEndPoint(destDataNode, regionId);
boolean addPeerSucceed = true;
Throwable throwable = null;
for (int i = 0; i < MAX_RETRY_NUM; i++) {
try {
if (!addPeerSucceed) {
Thread.sleep(SLEEP_MILLIS);
}
addRegionPeer(regionId, new Peer(regionId, destDataNode.getDataNodeId(), destEndpoint));
addPeerSucceed = true;
} catch (PeerAlreadyInConsensusGroupException e) {
addPeerSucceed = true;
} catch (InterruptedException e) {
throwable = e;
Thread.currentThread().interrupt();
} catch (ConsensusException e) {
addPeerSucceed = false;
throwable = e;
taskLogger.error(
"{}, executed addPeer {} for region {} error, retry times: {}",
REGION_MIGRATE_PROCESS,
destEndpoint,
regionId,
i,
e);
} catch (Exception e) {
addPeerSucceed = false;
throwable = e;
taskLogger.warn("Unexpected exception", e);
}
if (addPeerSucceed || throwable instanceof InterruptedException) {
break;
}
try {
addRegionPeer(regionId, new Peer(regionId, destDataNode.getDataNodeId(), destEndpoint));
} catch (PeerAlreadyInConsensusGroupException ignore) {

} catch (ConsensusException e) {
addPeerSucceed = false;
throwable = e;
taskLogger.error(
"{}, executed addPeer {} for region {} error",
REGION_MIGRATE_PROCESS,
destEndpoint,
regionId,
e);
} catch (Exception e) {
addPeerSucceed = false;
throwable = e;
taskLogger.warn("Unexpected exception", e);
}

if (!addPeerSucceed) {
String errorMsg =
String.format(
"%s, AddPeer for region error after max retry times, peerId: %s, regionId: %s",
"%s, AddPeer for region error, peerId: %s, regionId: %s",
REGION_MIGRATE_PROCESS, destEndpoint, regionId);
taskLogger.error(errorMsg, throwable);
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
Expand Down

0 comments on commit 4be9909

Please sign in to comment.