From 4be990922b4038f84f872532891548f4bdcc1c1b Mon Sep 17 00:00:00 2001 From: Li Yu Heng Date: Thu, 12 Dec 2024 20:03:35 +0800 Subject: [PATCH] [region migration] Remove retry of RegionMigrateService::addPeer (#14362) * done * synchronized & snapshot name * remove test * for removePeer and resetPeerList --- .../iotdb/consensus/iot/IoTConsensus.java | 157 +++++++++--------- .../consensus/iot/IoTConsensusServerImpl.java | 20 +-- .../iotdb/consensus/iot/StabilityTest.java | 36 ---- .../db/service/RegionMigrateService.java | 49 ++---- 4 files changed, 103 insertions(+), 159 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index bb4cf5c458de..7f1222e5c678 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -296,53 +296,55 @@ public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusE IoTConsensusServerImpl impl = Optional.ofNullable(stateMachineMap.get(groupId)) .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); - if (impl.getConfiguration().contains(peer)) { - throw new PeerAlreadyInConsensusGroupException(groupId, peer); - } - try { - // step 1: inactive new Peer to prepare for following steps - logger.info("[IoTConsensus] inactivate new peer: {}", peer); - impl.inactivePeer(peer, false); + synchronized (impl) { + if (impl.getConfiguration().contains(peer)) { + throw new PeerAlreadyInConsensusGroupException(groupId, peer); + } + try { + // step 1: inactive new Peer to prepare for following steps + logger.info("[IoTConsensus] inactivate new peer: {}", peer); + impl.inactivePeer(peer, false); - // step 2: notify all the other Peers to build the sync connection to newPeer - logger.info("[IoTConsensus] notify current peers to build sync log..."); - impl.notifyPeersToBuildSyncLogChannel(peer); + // step 2: notify all the other Peers to build the sync connection to newPeer + logger.info("[IoTConsensus] notify current peers to build sync log..."); + impl.notifyPeersToBuildSyncLogChannel(peer); - // step 3: take snapshot - logger.info("[IoTConsensus] start to take snapshot..."); + // step 3: take snapshot + logger.info("[IoTConsensus] start to take snapshot..."); - impl.takeSnapshot(); + impl.takeSnapshot(); - // step 4: transit snapshot - logger.info("[IoTConsensus] start to transmit snapshot..."); - impl.transmitSnapshot(peer); + // step 4: transit snapshot + logger.info("[IoTConsensus] start to transmit snapshot..."); + impl.transmitSnapshot(peer); - // step 5: let the new peer load snapshot - logger.info("[IoTConsensus] trigger new peer to load snapshot..."); - impl.triggerSnapshotLoad(peer); - KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION); + // step 5: let the new peer load snapshot + logger.info("[IoTConsensus] trigger new peer to load snapshot..."); + impl.triggerSnapshotLoad(peer); + KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION); - // step 6: active new Peer - logger.info("[IoTConsensus] activate new peer..."); - impl.activePeer(peer); + // step 6: active new Peer + logger.info("[IoTConsensus] activate new peer..."); + impl.activePeer(peer); + + // step 7: notify remote peer to clean up transferred snapshot + logger.info("[IoTConsensus] clean up remote snapshot..."); + try { + impl.cleanupRemoteSnapshot(peer); + } catch (ConsensusGroupModifyPeerException e) { + logger.warn("[IoTConsensus] failed to cleanup remote snapshot", e); + } + KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE); - // step 7: notify remote peer to clean up transferred snapshot - logger.info("[IoTConsensus] clean up remote snapshot..."); - try { - impl.cleanupRemoteSnapshot(peer); } catch (ConsensusGroupModifyPeerException e) { - logger.warn("[IoTConsensus] failed to cleanup remote snapshot", e); + logger.info("[IoTConsensus] add remote peer failed, automatic cleanup side effects..."); + // try to clean up the sync log channel + impl.notifyPeersToRemoveSyncLogChannel(peer); + throw new ConsensusException(e); + } finally { + logger.info("[IoTConsensus] clean up local snapshot..."); + impl.cleanupLocalSnapshot(); } - KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE); - - } catch (ConsensusGroupModifyPeerException e) { - logger.info("[IoTConsensus] add remote peer failed, automatic cleanup side effects..."); - // try to clean up the sync log channel - impl.notifyPeersToRemoveSyncLogChannel(peer); - throw new ConsensusException(e); - } finally { - logger.info("[IoTConsensus] clean up local snapshot..."); - impl.cleanupLocalSnapshot(); } } @@ -352,29 +354,32 @@ public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws Consens Optional.ofNullable(stateMachineMap.get(groupId)) .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); - if (!impl.getConfiguration().contains(peer)) { - throw new PeerNotInConsensusGroupException(groupId, peer.toString()); - } + synchronized (impl) { + if (!impl.getConfiguration().contains(peer)) { + throw new PeerNotInConsensusGroupException(groupId, peer.toString()); + } - KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.INIT); + KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.INIT); - // let other peers remove the sync channel with target peer - impl.notifyPeersToRemoveSyncLogChannel(peer); - KillPoint.setKillPoint( - IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_NOTIFY_PEERS_TO_REMOVE_REPLICATE_CHANNEL); + // let other peers remove the sync channel with target peer + impl.notifyPeersToRemoveSyncLogChannel(peer); + KillPoint.setKillPoint( + IoTConsensusRemovePeerCoordinatorKillPoints + .AFTER_NOTIFY_PEERS_TO_REMOVE_REPLICATE_CHANNEL); - try { - // let target peer reject new write - impl.inactivePeer(peer, true); - KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_INACTIVE_PEER); - // wait its SyncLog to complete - impl.waitTargetPeerUntilSyncLogCompleted(peer); - // wait its region related resource to release - impl.waitReleaseAllRegionRelatedResource(peer); - } catch (ConsensusGroupModifyPeerException e) { - throw new ConsensusException(e.getMessage()); + try { + // let target peer reject new write + impl.inactivePeer(peer, true); + KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_INACTIVE_PEER); + // wait its SyncLog to complete + impl.waitTargetPeerUntilSyncLogCompleted(peer); + // wait its region related resource to release + impl.waitReleaseAllRegionRelatedResource(peer); + } catch (ConsensusGroupModifyPeerException e) { + throw new ConsensusException(e.getMessage()); + } + KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH); } - KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH); } @Override @@ -485,6 +490,7 @@ public void resetPeerList(ConsensusGroupId groupId, List correctPeers) IoTConsensusServerImpl impl = Optional.ofNullable(stateMachineMap.get(groupId)) .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); + Peer localPeer = new Peer(groupId, thisNodeId, thisNode); if (!correctPeers.contains(localPeer)) { logger.info( @@ -493,25 +499,28 @@ public void resetPeerList(ConsensusGroupId groupId, List correctPeers) deleteLocalPeer(groupId); return; } - ImmutableList currentMembers = ImmutableList.copyOf(impl.getConfiguration()); - String previousPeerListStr = currentMembers.toString(); - for (Peer peer : currentMembers) { - if (!correctPeers.contains(peer)) { - if (!impl.removeSyncLogChannel(peer)) { - logger.error( - "[RESET PEER LIST] Failed to remove peer {}'s sync log channel from group {}", - peer, - groupId); + + synchronized (impl) { + ImmutableList currentMembers = ImmutableList.copyOf(impl.getConfiguration()); + String previousPeerListStr = currentMembers.toString(); + for (Peer peer : currentMembers) { + if (!correctPeers.contains(peer)) { + if (!impl.removeSyncLogChannel(peer)) { + logger.error( + "[RESET PEER LIST] Failed to remove peer {}'s sync log channel from group {}", + peer, + groupId); + } } } - } - logger.info( - "[RESET PEER LIST] Local peer list has been reset: {} -> {}", - previousPeerListStr, - impl.getConfiguration()); - for (Peer peer : correctPeers) { - if (!impl.getConfiguration().contains(peer)) { - logger.warn("[RESET PEER LIST] \"Correct peer\" {} is not in local peer list", peer); + logger.info( + "[RESET PEER LIST] Local peer list has been reset: {} -> {}", + previousPeerListStr, + impl.getConfiguration()); + for (Peer peer : correctPeers) { + if (!impl.getConfiguration().contains(peer)) { + logger.warn("[RESET PEER LIST] \"Correct peer\" {} is not in local peer list", peer); + } } } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index 57256b39b6ab..8ea522aea27e 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -91,6 +91,7 @@ import java.util.List; import java.util.PriorityQueue; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -276,10 +277,9 @@ public DataSet read(IConsensusRequest request) { public void takeSnapshot() throws ConsensusGroupModifyPeerException { try { - long newSnapshotIndex = getLatestSnapshotIndex() + 1; newSnapshotDirName = String.format( - "%s_%s_%d", SNAPSHOT_DIR_NAME, thisNode.getGroupId().getId(), newSnapshotIndex); + "%s_%s_%s", SNAPSHOT_DIR_NAME, thisNode.getGroupId().getId(), UUID.randomUUID()); File snapshotDir = new File(storageDir, newSnapshotDirName); if (snapshotDir.exists()) { FileUtils.deleteDirectory(snapshotDir); @@ -400,22 +400,6 @@ private String calculateSnapshotPath(String snapshotId, String originalFilePath) return originalFilePath.substring(originalFilePath.indexOf(snapshotId)); } - private long getLatestSnapshotIndex() { - long snapShotIndex = 0; - File directory = new File(storageDir); - File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(SNAPSHOT_DIR_NAME)); - if (versionFiles == null || versionFiles.length == 0) { - return snapShotIndex; - } - for (File file : versionFiles) { - snapShotIndex = - Math.max( - snapShotIndex, - Long.parseLong(SNAPSHOT_INDEX_PATTEN.matcher(file.getName()).replaceAll(""))); - } - return snapShotIndex; - } - private void clearOldSnapshot() { File directory = new File(storageDir); File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(SNAPSHOT_DIR_NAME)); diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java index 43e328e38335..d7675084680c 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java @@ -29,7 +29,6 @@ import org.apache.iotdb.consensus.config.ConsensusConfig; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; -import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException; import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException; import org.apache.iotdb.consensus.exception.IllegalPeerNumException; @@ -95,7 +94,6 @@ public void allTest() throws Exception { peerTest(); transferLeader(); snapshotTest(); - snapshotUpgradeTest(); } public void addConsensusGroup() { @@ -212,38 +210,4 @@ public void snapshotTest() throws ConsensusException { Assert.assertNotEquals(versionFiles1[0].getName(), versionFiles2[0].getName()); consensusImpl.deleteLocalPeer(dataRegionId); } - - public void snapshotUpgradeTest() throws Exception { - consensusImpl.createLocalPeer( - dataRegionId, - Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort)))); - consensusImpl.triggerSnapshot(dataRegionId, false); - long oldSnapshotIndex = System.currentTimeMillis(); - String oldSnapshotDirName = - String.format( - "%s_%s_%d", - IoTConsensusServerImpl.SNAPSHOT_DIR_NAME, dataRegionId.getId(), oldSnapshotIndex); - File regionDir = new File(storageDir, "1_1"); - File oldSnapshotDir = new File(regionDir, oldSnapshotDirName); - if (oldSnapshotDir.exists()) { - FileUtils.deleteFully(oldSnapshotDir); - } - if (!oldSnapshotDir.mkdirs()) { - throw new ConsensusGroupModifyPeerException( - String.format("%s: cannot mkdir for snapshot", dataRegionId)); - } - consensusImpl.triggerSnapshot(dataRegionId, false); - Assert.assertFalse(oldSnapshotDir.exists()); - - File dataDir = new File(IoTConsensus.buildPeerDir(storageDir, dataRegionId)); - - File[] snapshotFiles = - dataDir.listFiles((dir, name) -> name.startsWith(IoTConsensusServerImpl.SNAPSHOT_DIR_NAME)); - Assert.assertNotNull(snapshotFiles); - Assert.assertEquals(1, snapshotFiles.length); - Assert.assertEquals( - oldSnapshotIndex + 1, - Long.parseLong(snapshotFiles[0].getName().replaceAll(".*[^\\d](?=(\\d+))", ""))); - consensusImpl.deleteLocalPeer(dataRegionId); - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java index c6551c0d54d2..056df802c138 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java @@ -256,42 +256,29 @@ private TSStatus addPeer() { TEndPoint destEndpoint = getConsensusEndPoint(destDataNode, regionId); boolean addPeerSucceed = true; Throwable throwable = null; - for (int i = 0; i < MAX_RETRY_NUM; i++) { - try { - if (!addPeerSucceed) { - Thread.sleep(SLEEP_MILLIS); - } - addRegionPeer(regionId, new Peer(regionId, destDataNode.getDataNodeId(), destEndpoint)); - addPeerSucceed = true; - } catch (PeerAlreadyInConsensusGroupException e) { - addPeerSucceed = true; - } catch (InterruptedException e) { - throwable = e; - Thread.currentThread().interrupt(); - } catch (ConsensusException e) { - addPeerSucceed = false; - throwable = e; - taskLogger.error( - "{}, executed addPeer {} for region {} error, retry times: {}", - REGION_MIGRATE_PROCESS, - destEndpoint, - regionId, - i, - e); - } catch (Exception e) { - addPeerSucceed = false; - throwable = e; - taskLogger.warn("Unexpected exception", e); - } - if (addPeerSucceed || throwable instanceof InterruptedException) { - break; - } + try { + addRegionPeer(regionId, new Peer(regionId, destDataNode.getDataNodeId(), destEndpoint)); + } catch (PeerAlreadyInConsensusGroupException ignore) { + + } catch (ConsensusException e) { + addPeerSucceed = false; + throwable = e; + taskLogger.error( + "{}, executed addPeer {} for region {} error", + REGION_MIGRATE_PROCESS, + destEndpoint, + regionId, + e); + } catch (Exception e) { + addPeerSucceed = false; + throwable = e; + taskLogger.warn("Unexpected exception", e); } if (!addPeerSucceed) { String errorMsg = String.format( - "%s, AddPeer for region error after max retry times, peerId: %s, regionId: %s", + "%s, AddPeer for region error, peerId: %s, regionId: %s", REGION_MIGRATE_PROCESS, destEndpoint, regionId); taskLogger.error(errorMsg, throwable); status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());