diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java index ce5acf73cc3a..01ec5be565eb 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java @@ -347,16 +347,28 @@ public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusE impl.setRemotePeerActive(peer, false, false); // step 2: notify all the other Peers to create consensus pipes to newPeer - // NOTE: For this step, all the other peers will try to transfer its user write data to target + // NOTE: For this step, coordinator(thisNode) will transfer its full data snapshot to target + // while other peers record the coordinator's progress. LOGGER.info("[{}] notify current peers to create consensus pipes...", CLASS_NAME); - impl.notifyPeersToCreateConsensusPipes(peer); + impl.notifyPeersToCreateConsensusPipes(peer, impl.getThisNodePeer()); KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION); - // step 3: wait until all other Peers finish transferring + // step 3: wait until the coordinator Peer finishes transferring snapshot + LOGGER.info( + "[{}] wait until the coordinator Peer finishes transferring snapshot...", CLASS_NAME); + impl.waitPeersToTargetPeerTransmissionCompleted(peer, true); + + // step 4. start other peers' consensus pipe to target peer to transfer remaining data + // NOTE: For this step, other peers will start to transmit data(may contain both historical + // and realtime data) after the snapshot progress to target. + LOGGER.info("[{}] start transfer remaining data from other peers", CLASS_NAME); + impl.startOtherConsensusPipesToTargetPeer(peer); + + // step 5: wait until all other Peers finish transferring LOGGER.info("[{}] wait until all the other peers finish transferring...", CLASS_NAME); - impl.waitPeersToTargetPeerTransmissionCompleted(peer); + impl.waitPeersToTargetPeerTransmissionCompleted(peer, false); - // step 4: active new Peer to let new Peer receive client requests + // step 6: active new Peer to let new Peer receive client requests LOGGER.info("[{}] activate new peer...", CLASS_NAME); impl.setRemotePeerActive(peer, true, false); KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE); @@ -407,6 +419,7 @@ public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws Consens LOGGER.info("[{}] wait {} to release all resource...", CLASS_NAME, peer); impl.waitReleaseAllRegionRelatedResource(peer); } catch (ConsensusGroupModifyPeerException e) { + LOGGER.warn("Remove remote peer failed, may because remote peer is down or other reasons", e); throw new ConsensusException(e); } KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java index b085abbd64b2..f11ef3cf735e 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java @@ -95,6 +95,7 @@ public class PipeConsensusServerImpl { private final ReplicateMode replicateMode; private ProgressIndex cachedProgressIndex = MinimumProgressIndex.INSTANCE; + private ProgressIndex cachedReplicateProgressIndex = MinimumProgressIndex.INSTANCE; public PipeConsensusServerImpl( Peer thisNode, @@ -357,6 +358,10 @@ public TSStatus writeOnFollowerReplica(IConsensusRequest request) { getStateMachineLockTime - consensusWriteStartTime); long writeToStateMachineStartTime = System.nanoTime(); + if (request instanceof ComparableConsensusRequest) { + progressIndexManager.recordPeerMaxProgressIndex( + thisNode.getGroupId(), ((ComparableConsensusRequest) request).getProgressIndex()); + } TSStatus result = stateMachine.write(request); long writeToStateMachineEndTime = System.nanoTime(); @@ -371,6 +376,10 @@ public TSStatus writeOnFollowerReplica(IConsensusRequest request) { } } + public void recordTsFileProgressOnFollowerReplica(ProgressIndex progressIndex) { + progressIndexManager.recordPeerMaxProgressIndex(thisNode.getGroupId(), progressIndex); + } + public DataSet read(IConsensusRequest request) { return stateMachine.read(request); } @@ -401,13 +410,14 @@ public void setRemotePeerActive(Peer peer, boolean isActive, boolean isForDeleti } } - public void notifyPeersToCreateConsensusPipes(Peer targetPeer) + public void notifyPeersToCreateConsensusPipes(Peer targetPeer, Peer coordinatorPeer) throws ConsensusGroupModifyPeerException { final List otherPeers = peerManager.getOtherPeers(thisNode); for (Peer peer : otherPeers) { if (peer.equals(targetPeer)) { continue; } + // other peer will manually start pipe task after coordinator finishing transferring snapshot. try (SyncPipeConsensusServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) { TNotifyPeerToCreateConsensusPipeResp resp = @@ -415,7 +425,9 @@ public void notifyPeersToCreateConsensusPipes(Peer targetPeer) new TNotifyPeerToCreateConsensusPipeReq( targetPeer.getGroupId().convertToTConsensusGroupId(), targetPeer.getEndpoint(), - targetPeer.getNodeId())); + targetPeer.getNodeId(), + coordinatorPeer.getEndpoint(), + coordinatorPeer.getNodeId())); if (!RpcUtils.SUCCESS_STATUS.equals(resp.getStatus())) { throw new ConsensusGroupModifyPeerException( String.format("error when notify peer %s to create consensus pipe", peer)); @@ -432,7 +444,7 @@ public void notifyPeersToCreateConsensusPipes(Peer targetPeer) try { // This node which acts as coordinator will transfer complete historical snapshot to new // target. - createConsensusPipeToTargetPeer(targetPeer, false); + createConsensusPipeToTargetPeer(targetPeer, coordinatorPeer, false); } catch (Exception e) { LOGGER.warn( "{} cannot create consensus pipe to {}, may because target peer is unknown currently, please manually check!", @@ -444,10 +456,12 @@ public void notifyPeersToCreateConsensusPipes(Peer targetPeer) } public synchronized void createConsensusPipeToTargetPeer( - Peer targetPeer, boolean needManuallyStart) throws ConsensusGroupModifyPeerException { + Peer targetPeer, Peer regionMigrationCoordinatorPeer, boolean needManuallyStart) + throws ConsensusGroupModifyPeerException { try { KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE); - consensusPipeManager.createConsensusPipe(thisNode, targetPeer, needManuallyStart); + consensusPipeManager.createConsensusPipe( + thisNode, targetPeer, regionMigrationCoordinatorPeer, needManuallyStart); peerManager.addAndPersist(targetPeer); } catch (IOException e) { LOGGER.warn("{} cannot persist peer {}", thisNode, targetPeer, e); @@ -535,34 +549,56 @@ public void startOtherConsensusPipesToTargetPeer(Peer targetPeer) } /** Wait for the user written data up to firstCheck to be replicated */ - public void waitPeersToTargetPeerTransmissionCompleted(Peer targetPeer) - throws ConsensusGroupModifyPeerException { + public void waitPeersToTargetPeerTransmissionCompleted( + Peer targetPeer, boolean checkForCoordinator) throws ConsensusGroupModifyPeerException { boolean isTransmissionCompleted = false; boolean isFirstCheckForCurrentPeer = true; boolean isFirstCheckForOtherPeers = true; try { - while (!isTransmissionCompleted) { - Thread.sleep(CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS); - - if (isConsensusPipesTransmissionCompleted( - Collections.singletonList(new ConsensusPipeName(thisNode, targetPeer).toString()), - isFirstCheckForCurrentPeer)) { + if (checkForCoordinator) { + ConsensusPipeName consensusPipeName = new ConsensusPipeName(thisNode, targetPeer); + while (!isTransmissionCompleted) { + // Only wait coordinator to transfer snapshot instead of waiting all peers completing data + // transfer. Keep consistent with IoTV1. + isTransmissionCompleted = + isConsensusPipesTransmissionCompleted( + Collections.singletonList(consensusPipeName.toString()), + isFirstCheckForCurrentPeer, + true); + + isFirstCheckForCurrentPeer = false; + + Thread.sleep(CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS); + } + // Reset coordinator's processor filter switch + ConsensusPipeManager.getProcessorFilterSwitch().put(consensusPipeName, true); + } else { + while (!isTransmissionCompleted) { final List otherPeers = peerManager.getOtherPeers(thisNode); - isTransmissionCompleted = true; for (Peer peer : otherPeers) { if (!peer.equals(targetPeer)) { - isTransmissionCompleted &= - isRemotePeerConsensusPipesTransmissionCompleted( - peer, - Collections.singletonList(new ConsensusPipeName(peer, targetPeer).toString()), - isFirstCheckForOtherPeers); + try { + isTransmissionCompleted &= + isRemotePeerConsensusPipesTransmissionCompleted( + peer, + Collections.singletonList( + new ConsensusPipeName(peer, targetPeer).toString()), + isFirstCheckForOtherPeers); + } catch (Exception e) { + LOGGER.warn( + "{} failed to check remote peer{}'s transmission progress, may because this peer has down. Ignore this exception and move on", + thisNode, + peer, + e); + } } + isFirstCheckForOtherPeers = false; + + Thread.sleep(CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS); } - isFirstCheckForOtherPeers = false; } - isFirstCheckForCurrentPeer = false; } } catch (InterruptedException e) { LOGGER.warn("{} is interrupted when waiting for transfer completed", thisNode, e); @@ -630,21 +666,41 @@ private boolean isRemotePeerConsensusPipesTransmissionCompleted( } public synchronized boolean isConsensusPipesTransmissionCompleted( - List consensusPipeNames, boolean refreshCachedProgressIndex) { + List consensusPipeNames, + boolean refreshCachedProgressIndex, + boolean checkForCoordinator) { if (refreshCachedProgressIndex) { cachedProgressIndex = cachedProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex( progressIndexManager.getMaxAssignedProgressIndex(thisNode.getGroupId())); + + cachedReplicateProgressIndex = + cachedReplicateProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex( + progressIndexManager.getMaxReplicatedProgressIndex(thisNode.getGroupId())); } try { - return consensusPipeNames.stream() - .noneMatch( - name -> - cachedProgressIndex.isAfter( - progressIndexManager.getProgressIndex(new ConsensusPipeName(name)))); + if (checkForCoordinator) { + // check all types of write(user local write and replica write), because coordinator will + // transfer data regardless write type. + return consensusPipeNames.stream() + .allMatch( + name -> { + ProgressIndex currentProgressIndex = + progressIndexManager.getProgressIndex(new ConsensusPipeName(name)); + return currentProgressIndex.isAfter(cachedProgressIndex) + && currentProgressIndex.isAfter(cachedReplicateProgressIndex); + }); + } else { + // only check user local write progressIndex + return consensusPipeNames.stream() + .noneMatch( + name -> + cachedProgressIndex.isAfter( + progressIndexManager.getProgressIndex(new ConsensusPipeName(name)))); + } } catch (PipeException e) { - LOGGER.info(e.getMessage()); + LOGGER.warn(e.getMessage(), e); return false; } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java index 1fc2abe03bf6..eac150364eec 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java @@ -29,6 +29,7 @@ import org.apache.commons.lang3.tuple.Triple; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_CONSENSUS_PIPE_NAME; @@ -41,15 +42,19 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CAPTURE_TREE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_GROUP_ID_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_INCLUSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_CONSENSUS_PIPE_NAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_KEY; public class ConsensusPipeManager { // Extract data.insert and data.delete to support deletion. private static final String CONSENSUS_EXTRACTOR_INCLUSION_VALUE = "data"; + private static final Map processorFilterSwitchMap = + new ConcurrentHashMap<>(); private final PipeConsensusConfig.Pipe config; private final ReplicateMode replicateMode; private final ConsensusPipeDispatcher dispatcher; @@ -65,10 +70,11 @@ public ConsensusPipeManager(PipeConsensusConfig.Pipe config, ReplicateMode repli /** This method is used except region migration. */ public void createConsensusPipe(Peer senderPeer, Peer receiverPeer) throws Exception { ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer, receiverPeer); + processorFilterSwitchMap.put(consensusPipeName, true); // The third parameter is only used when region migration. Since this method is not called by // region migration, just pass senderPeer in to get the correct result. Triple, ImmutableMap, ImmutableMap> - params = buildPipeParams(senderPeer, receiverPeer); + params = buildPipeParams(senderPeer, receiverPeer, senderPeer); dispatcher.createPipe( consensusPipeName.toString(), params.getLeft(), @@ -78,11 +84,18 @@ public void createConsensusPipe(Peer senderPeer, Peer receiverPeer) throws Excep } /** This method is used when executing region migration */ - public void createConsensusPipe(Peer senderPeer, Peer receiverPeer, boolean needManuallyStart) + public void createConsensusPipe( + Peer senderPeer, + Peer receiverPeer, + Peer regionMigrationCoordinatorPeer, + boolean needManuallyStart) throws Exception { ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer, receiverPeer); + // coordinator which doesn't need manually start turns off the filter switch + processorFilterSwitchMap.put(consensusPipeName, needManuallyStart); + Triple, ImmutableMap, ImmutableMap> - params = buildPipeParams(senderPeer, receiverPeer); + params = buildPipeParams(senderPeer, receiverPeer, regionMigrationCoordinatorPeer); dispatcher.createPipe( consensusPipeName.toString(), params.getLeft(), @@ -93,9 +106,10 @@ public void createConsensusPipe(Peer senderPeer, Peer receiverPeer, boolean need public Triple< ImmutableMap, ImmutableMap, ImmutableMap> - buildPipeParams(Peer senderPeer, Peer receiverPeer) { + buildPipeParams(Peer senderPeer, Peer receiverPeer, Peer coordinatorPeer) { ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer, receiverPeer); - return new ImmutableTriple<>( + + ImmutableMap basicExtractorParams = ImmutableMap.builder() .put(EXTRACTOR_KEY, config.getExtractorPluginName()) .put(EXTRACTOR_INCLUSION_KEY, CONSENSUS_EXTRACTOR_INCLUSION_VALUE) @@ -111,9 +125,24 @@ public void createConsensusPipe(Peer senderPeer, Peer receiverPeer, boolean need .put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode.getValue()) .put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true)) .put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true)) - .build(), + .build(); + ImmutableMap extractorParams; + if (senderPeer.equals(coordinatorPeer)) { + extractorParams = basicExtractorParams; + } else { + extractorParams = + ImmutableMap.builder() + .putAll(basicExtractorParams) + .put( + EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY, + String.valueOf(new ConsensusPipeName(senderPeer, coordinatorPeer))) + .build(); + } + return new ImmutableTriple<>( + extractorParams, ImmutableMap.builder() .put(PROCESSOR_KEY, config.getProcessorPluginName()) + .put(PROCESSOR_CONSENSUS_PIPE_NAME_KEY, consensusPipeName.toString()) .build(), ImmutableMap.builder() .put(CONNECTOR_KEY, config.getConnectorPluginName()) @@ -131,6 +160,7 @@ public void createConsensusPipe(Peer senderPeer, Peer receiverPeer, boolean need public void dropConsensusPipe(Peer senderPeer, Peer receiverPeer) throws Exception { ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer, receiverPeer); dispatcher.dropPipe(consensusPipeName); + processorFilterSwitchMap.remove(consensusPipeName); } public void updateConsensusPipe(ConsensusPipeName consensusPipeName, PipeStatus pipeStatus) @@ -141,6 +171,7 @@ public void updateConsensusPipe(ConsensusPipeName consensusPipeName, PipeStatus dispatcher.stopPipe(consensusPipeName.toString()); } else if (PipeStatus.DROPPED.equals(pipeStatus)) { dispatcher.dropPipe(consensusPipeName); + processorFilterSwitchMap.remove(consensusPipeName); } else { throw new IllegalArgumentException("Unsupported pipe status: " + pipeStatus); } @@ -149,4 +180,8 @@ public void updateConsensusPipe(ConsensusPipeName consensusPipeName, PipeStatus public Map getAllConsensusPipe() { return selector.getAllConsensusPipe(); } + + public static Map getProcessorFilterSwitch() { + return processorFilterSwitchMap; + } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ProgressIndexManager.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ProgressIndexManager.java index e5dc2e922ab6..5fcf22bfcc2f 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ProgressIndexManager.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ProgressIndexManager.java @@ -28,4 +28,8 @@ public interface ProgressIndexManager { ProgressIndex assignProgressIndex(ConsensusGroupId consensusGroupId); ProgressIndex getMaxAssignedProgressIndex(ConsensusGroupId consensusGroupId); + + void recordPeerMaxProgressIndex(ConsensusGroupId consensusGroupId, ProgressIndex progressIndex); + + ProgressIndex getMaxReplicatedProgressIndex(ConsensusGroupId consensusGroupId); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java index 3aa69af6ff50..fcf6101922f0 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java @@ -126,7 +126,11 @@ public TNotifyPeerToCreateConsensusPipeResp notifyPeerToCreateConsensusPipe( ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId), req.targetPeerNodeId, req.targetPeerEndPoint), - false); + new Peer( + ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId), + req.coordinatorPeerNodeId, + req.coordinatorPeerEndPoint), + true); responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } catch (ConsensusGroupModifyPeerException e) { responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -188,7 +192,7 @@ public TCheckConsensusPipeCompletedResp checkConsensusPipeCompleted( try { isCompleted = impl.isConsensusPipesTransmissionCompleted( - req.consensusPipeNames, req.refreshCachedProgressIndex); + req.consensusPipeNames, req.refreshCachedProgressIndex, false); responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } catch (Exception e) { responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ProgressIndexDataNodeManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ProgressIndexDataNodeManager.java index e9ae4ff4454d..5713499ec054 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ProgressIndexDataNodeManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ProgressIndexDataNodeManager.java @@ -34,6 +34,9 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -41,11 +44,16 @@ import java.util.stream.Collectors; public class ProgressIndexDataNodeManager implements ProgressIndexManager { + private static final Logger LOGGER = LoggerFactory.getLogger(ProgressIndexDataNodeManager.class); private static final int DATA_NODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); private final Map groupId2MaxProgressIndex; + // 1 dataRegion -> 1 recoverProgressIndex with n peers(denoted by datanodeId) -> n + // simpleProgressIndex. + private final Map groupId2PeersMaxProgressIndexMap; public ProgressIndexDataNodeManager() { this.groupId2MaxProgressIndex = new ConcurrentHashMap<>(); + this.groupId2PeersMaxProgressIndexMap = new ConcurrentHashMap<>(); recoverMaxProgressIndexFromDataRegion(); } @@ -74,6 +82,20 @@ public static ProgressIndex extractLocalSimpleProgressIndex(ProgressIndex progre return MinimumProgressIndex.INSTANCE; } + public static ProgressIndex extractRecoverProgressIndex(ProgressIndex progressIndex) { + if (progressIndex instanceof RecoverProgressIndex) { + return progressIndex; + } else if (progressIndex instanceof HybridProgressIndex) { + final Map type2Index = + ((HybridProgressIndex) progressIndex).getType2Index(); + if (!type2Index.containsKey(ProgressIndexType.RECOVER_PROGRESS_INDEX.getType())) { + return MinimumProgressIndex.INSTANCE; + } + type2Index.get(ProgressIndexType.RECOVER_PROGRESS_INDEX.getType()); + } + return MinimumProgressIndex.INSTANCE; + } + private void recoverMaxProgressIndexFromDataRegion() { StorageEngine.getInstance() .getAllDataRegionIds() @@ -93,11 +115,23 @@ private void recoverMaxProgressIndexFromDataRegion() { .collect(Collectors.toList())); ProgressIndex maxProgressIndex = MinimumProgressIndex.INSTANCE; + ProgressIndex maxPeersProgressIndex = MinimumProgressIndex.INSTANCE; for (ProgressIndex progressIndex : allProgressIndex) { + ProgressIndex extractedProgressIndex = + extractLocalSimpleProgressIndex(progressIndex); maxProgressIndex = - maxProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex( - extractLocalSimpleProgressIndex(progressIndex)); + extractedProgressIndex instanceof SimpleProgressIndex + ? maxProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex( + new RecoverProgressIndex( + DATA_NODE_ID, (SimpleProgressIndex) extractedProgressIndex)) + : maxProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex( + extractedProgressIndex); + + maxPeersProgressIndex = + maxPeersProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex( + extractRecoverProgressIndex(progressIndex)); } + // Renew a variable to pass the examination of compiler final ProgressIndex finalMaxProgressIndex = maxProgressIndex; groupId2MaxProgressIndex.compute( @@ -105,6 +139,12 @@ private void recoverMaxProgressIndexFromDataRegion() { (key, value) -> (value == null ? MinimumProgressIndex.INSTANCE : value) .updateToMinimumEqualOrIsAfterProgressIndex(finalMaxProgressIndex)); + final ProgressIndex finalMaxPeersProgressIndex = maxPeersProgressIndex; + groupId2PeersMaxProgressIndexMap.compute( + dataRegionId, + (key, value) -> + (value == null ? MinimumProgressIndex.INSTANCE : value) + .updateToMinimumEqualOrIsAfterProgressIndex(finalMaxPeersProgressIndex)); }); } @@ -129,4 +169,44 @@ public ProgressIndex assignProgressIndex(ConsensusGroupId consensusGroupId) { public ProgressIndex getMaxAssignedProgressIndex(ConsensusGroupId consensusGroupId) { return groupId2MaxProgressIndex.getOrDefault(consensusGroupId, MinimumProgressIndex.INSTANCE); } + + @Override + public void recordPeerMaxProgressIndex( + ConsensusGroupId consensusGroupId, ProgressIndex progressIndex) { + RecoverProgressIndex recoverProgressIndex = getRecoverProgressIndex(progressIndex); + if (recoverProgressIndex == null) { + // IoTV2 uses recoverProgressIndex + LOGGER.warn( + "[THIS SHOULD NOT HAPPEN] The progress index is null. ConsensusGroupId: {}.", + consensusGroupId); + return; + } + + groupId2PeersMaxProgressIndexMap.compute( + consensusGroupId, + (key, value) -> + ((value == null ? MinimumProgressIndex.INSTANCE : value) + .updateToMinimumEqualOrIsAfterProgressIndex(recoverProgressIndex))); + } + + private static RecoverProgressIndex getRecoverProgressIndex(ProgressIndex progressIndex) { + RecoverProgressIndex recoverProgressIndex = null; + + if (progressIndex instanceof RecoverProgressIndex) { + recoverProgressIndex = (RecoverProgressIndex) progressIndex; + } else if (progressIndex instanceof HybridProgressIndex) { + final Map type2Index = + ((HybridProgressIndex) progressIndex).getType2Index(); + recoverProgressIndex = + ((RecoverProgressIndex) + type2Index.getOrDefault(ProgressIndexType.RECOVER_PROGRESS_INDEX.getType(), null)); + } + return recoverProgressIndex; + } + + @Override + public ProgressIndex getMaxReplicatedProgressIndex(ConsensusGroupId consensusGroupId) { + return this.groupId2PeersMaxProgressIndexMap.getOrDefault( + consensusGroupId, MinimumProgressIndex.INSTANCE); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java index 011bfcc389ab..179e3c0a4f90 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java @@ -31,6 +31,8 @@ import org.apache.iotdb.commons.pipe.datastructure.PersistentResource; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; +import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName; +import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource; import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager; import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; @@ -71,6 +73,7 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY; @@ -305,7 +308,21 @@ public void customize( pipeName = environment.getPipeName(); creationTime = environment.getCreationTime(); pipeTaskMeta = environment.getPipeTaskMeta(); - startIndex = environment.getPipeTaskMeta().getProgressIndex(); + if (parameters.hasAnyAttributes(EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY)) { + ConsensusPipeName currentNode2CoordinatorPipeName = + new ConsensusPipeName( + parameters.getString(EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY)); + // For region migration in IoTV2, non-coordinators will only transfer data after + // `ProgressIndex(non-coordinators2coordinator)` + startIndex = + PipeDataNodeAgent.task() + .getPipeTaskProgressIndex( + currentNode2CoordinatorPipeName.toString(), + currentNode2CoordinatorPipeName.getConsensusGroupId().getId()); + LOGGER.info("Debug: startIndex: {}", startIndex); + } else { + startIndex = environment.getPipeTaskMeta().getProgressIndex(); + } dataRegionId = environment.getRegionId(); synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/pipeconsensus/PipeConsensusProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/pipeconsensus/PipeConsensusProcessor.java index bbb02084514a..fa3d10746ab2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/pipeconsensus/PipeConsensusProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/pipeconsensus/PipeConsensusProcessor.java @@ -24,6 +24,8 @@ import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeManager; +import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.pipe.api.PipeProcessor; @@ -37,15 +39,21 @@ import java.util.Map; +import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_CONSENSUS_PIPE_NAME_KEY; + public class PipeConsensusProcessor implements PipeProcessor { private static final int DATA_NODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); + private ConsensusPipeName consensusPipeName; @Override public void validate(PipeParameterValidator validator) throws Exception {} @Override public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) - throws Exception {} + throws Exception { + consensusPipeName = + new ConsensusPipeName(parameters.getString(PROCESSOR_CONSENSUS_PIPE_NAME_KEY)); + } private boolean isContainLocalData(EnrichedEvent enrichedEvent) { final ProgressIndex progressIndex = enrichedEvent.getProgressIndex(); @@ -70,12 +78,16 @@ private boolean isContainLocalData(EnrichedEvent enrichedEvent) { @Override public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector) throws Exception { - // Only user-generated TsFileInsertionEvent can be replicated. Any tsFile synchronized from a - // replica should not be replicated again - if (tsFileInsertionEvent instanceof EnrichedEvent - && !((PipeTsFileInsertionEvent) tsFileInsertionEvent).isGeneratedByPipeConsensus()) { + if (tsFileInsertionEvent instanceof EnrichedEvent) { + if (!needFilter()) { + eventCollector.collect(tsFileInsertionEvent); + return; + } + // Only user-generated TsFileInsertionEvent can be replicated. Any tsFile synchronized from a + // replica should not be replicated again final EnrichedEvent enrichedEvent = (EnrichedEvent) tsFileInsertionEvent; - if (isContainLocalData(enrichedEvent)) { + if (!((PipeTsFileInsertionEvent) tsFileInsertionEvent).isGeneratedByPipeConsensus() + && isContainLocalData(enrichedEvent)) { eventCollector.collect(tsFileInsertionEvent); } } @@ -84,8 +96,12 @@ public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector ev @Override public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) throws Exception { - // Only user-generated TabletInsertionEvent can be replicated. if (tabletInsertionEvent instanceof EnrichedEvent) { + if (!needFilter()) { + eventCollector.collect(tabletInsertionEvent); + return; + } + // Only user-generated TabletInsertionEvent can be replicated. final EnrichedEvent enrichedEvent = (EnrichedEvent) tabletInsertionEvent; if (isContainLocalData(enrichedEvent)) { eventCollector.collect(tabletInsertionEvent); @@ -96,6 +112,11 @@ public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector ev @Override public void process(Event event, EventCollector eventCollector) throws Exception { if (event instanceof EnrichedEvent) { + if (!needFilter()) { + eventCollector.collect(event); + return; + } + final EnrichedEvent enrichedEvent = (EnrichedEvent) event; if (isContainLocalData(enrichedEvent)) { eventCollector.collect(event); @@ -105,4 +126,9 @@ public void process(Event event, EventCollector eventCollector) throws Exception @Override public void close() throws Exception {} + + private boolean needFilter() { + return ConsensusPipeManager.getProcessorFilterSwitch() + .getOrDefault(this.consensusPipeName, true); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java index 32ead7587e37..5d04ed1ccaf4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java @@ -88,8 +88,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; -import static org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils.generateTsFileResource; - public class PipeConsensusReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(PipeConsensusReceiver.class); private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig(); @@ -668,6 +666,9 @@ private TSStatus loadFileToDataRegion(String filePath, ProgressIndex progressInd consensusPipeName, filePath); } + // record replicated progressIndex + Optional.ofNullable(pipeConsensus.getImpl(consensusGroupId)) + .ifPresent(impl -> impl.recordTsFileProgressOnFollowerReplica(progressIndex)); return RpcUtils.SUCCESS_STATUS; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java index 7522cd444fe0..bc57308f1e88 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java @@ -135,6 +135,8 @@ public class PipeExtractorConstant { "extractor.consensus.sender-dn-id"; public static final String EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY = "extractor.consensus.receiver-dn-id"; + public static final String EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY = + "extractor.consensus.restore-progress-pipe-task-name"; private PipeExtractorConstant() { throw new IllegalStateException("Utility class"); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java index 3874be8e817f..c71f44d8221a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java @@ -82,6 +82,8 @@ public class PipeProcessorConstant { public static final String PROCESSOR_RENAME_DATABASE_NEW_DB_NAME = "processor.new-db-name"; + public static final String PROCESSOR_CONSENSUS_PIPE_NAME_KEY = "processor.consensus.pipe.name-id"; + private PipeProcessorConstant() { throw new IllegalStateException("Utility class"); } diff --git a/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift b/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift index ca7a043c007c..3d3bec771db0 100644 --- a/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift +++ b/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift @@ -63,6 +63,8 @@ struct TNotifyPeerToCreateConsensusPipeReq { 1: required common.TConsensusGroupId targetPeerConsensusGroupId 2: required common.TEndPoint targetPeerEndPoint 3: required i32 targetPeerNodeId + 4: required common.TEndPoint coordinatorPeerEndPoint + 5: required i32 coordinatorPeerNodeId } struct TNotifyPeerToCreateConsensusPipeResp {