Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Region Migration] Refactor consistency machanism for IoTV2 #14455

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -347,16 +347,28 @@ public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusE
impl.setRemotePeerActive(peer, false, false);

// step 2: notify all the other Peers to create consensus pipes to newPeer
// NOTE: For this step, all the other peers will try to transfer its user write data to target
// NOTE: For this step, coordinator(thisNode) will transfer its full data snapshot to target
// while other peers record the coordinator's progress.
LOGGER.info("[{}] notify current peers to create consensus pipes...", CLASS_NAME);
impl.notifyPeersToCreateConsensusPipes(peer);
impl.notifyPeersToCreateConsensusPipes(peer, impl.getThisNodePeer());
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);

// step 3: wait until all other Peers finish transferring
// step 3: wait until the coordinator Peer finishes transferring snapshot
LOGGER.info(
"[{}] wait until the coordinator Peer finishes transferring snapshot...", CLASS_NAME);
impl.waitPeersToTargetPeerTransmissionCompleted(peer, true);

// step 4. start other peers' consensus pipe to target peer to transfer remaining data
// NOTE: For this step, other peers will start to transmit data(may contain both historical
// and realtime data) after the snapshot progress to target.
LOGGER.info("[{}] start transfer remaining data from other peers", CLASS_NAME);
impl.startOtherConsensusPipesToTargetPeer(peer);

// step 5: wait until all other Peers finish transferring
LOGGER.info("[{}] wait until all the other peers finish transferring...", CLASS_NAME);
impl.waitPeersToTargetPeerTransmissionCompleted(peer);
impl.waitPeersToTargetPeerTransmissionCompleted(peer, false);

// step 4: active new Peer to let new Peer receive client requests
// step 6: active new Peer to let new Peer receive client requests
LOGGER.info("[{}] activate new peer...", CLASS_NAME);
impl.setRemotePeerActive(peer, true, false);
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);
Expand Down Expand Up @@ -407,6 +419,7 @@ public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws Consens
LOGGER.info("[{}] wait {} to release all resource...", CLASS_NAME, peer);
impl.waitReleaseAllRegionRelatedResource(peer);
} catch (ConsensusGroupModifyPeerException e) {
LOGGER.warn("Remove remote peer failed, may because remote peer is down or other reasons", e);
throw new ConsensusException(e);
}
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class PipeConsensusServerImpl {
private final ReplicateMode replicateMode;

private ProgressIndex cachedProgressIndex = MinimumProgressIndex.INSTANCE;
private ProgressIndex cachedReplicateProgressIndex = MinimumProgressIndex.INSTANCE;

public PipeConsensusServerImpl(
Peer thisNode,
Expand Down Expand Up @@ -357,6 +358,10 @@ public TSStatus writeOnFollowerReplica(IConsensusRequest request) {
getStateMachineLockTime - consensusWriteStartTime);

long writeToStateMachineStartTime = System.nanoTime();
if (request instanceof ComparableConsensusRequest) {
progressIndexManager.recordPeerMaxProgressIndex(
thisNode.getGroupId(), ((ComparableConsensusRequest) request).getProgressIndex());
}
TSStatus result = stateMachine.write(request);
long writeToStateMachineEndTime = System.nanoTime();

Expand All @@ -371,6 +376,10 @@ public TSStatus writeOnFollowerReplica(IConsensusRequest request) {
}
}

public void recordTsFileProgressOnFollowerReplica(ProgressIndex progressIndex) {
progressIndexManager.recordPeerMaxProgressIndex(thisNode.getGroupId(), progressIndex);
}

public DataSet read(IConsensusRequest request) {
return stateMachine.read(request);
}
Expand Down Expand Up @@ -401,21 +410,24 @@ public void setRemotePeerActive(Peer peer, boolean isActive, boolean isForDeleti
}
}

public void notifyPeersToCreateConsensusPipes(Peer targetPeer)
public void notifyPeersToCreateConsensusPipes(Peer targetPeer, Peer coordinatorPeer)
throws ConsensusGroupModifyPeerException {
final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
for (Peer peer : otherPeers) {
if (peer.equals(targetPeer)) {
continue;
}
// other peer will manually start pipe task after coordinator finishing transferring snapshot.
try (SyncPipeConsensusServiceClient client =
syncClientManager.borrowClient(peer.getEndpoint())) {
TNotifyPeerToCreateConsensusPipeResp resp =
client.notifyPeerToCreateConsensusPipe(
new TNotifyPeerToCreateConsensusPipeReq(
targetPeer.getGroupId().convertToTConsensusGroupId(),
targetPeer.getEndpoint(),
targetPeer.getNodeId()));
targetPeer.getNodeId(),
coordinatorPeer.getEndpoint(),
coordinatorPeer.getNodeId()));
if (!RpcUtils.SUCCESS_STATUS.equals(resp.getStatus())) {
throw new ConsensusGroupModifyPeerException(
String.format("error when notify peer %s to create consensus pipe", peer));
Expand All @@ -432,7 +444,7 @@ public void notifyPeersToCreateConsensusPipes(Peer targetPeer)
try {
// This node which acts as coordinator will transfer complete historical snapshot to new
// target.
createConsensusPipeToTargetPeer(targetPeer, false);
createConsensusPipeToTargetPeer(targetPeer, coordinatorPeer, false);
} catch (Exception e) {
LOGGER.warn(
"{} cannot create consensus pipe to {}, may because target peer is unknown currently, please manually check!",
Expand All @@ -444,10 +456,12 @@ public void notifyPeersToCreateConsensusPipes(Peer targetPeer)
}

public synchronized void createConsensusPipeToTargetPeer(
Peer targetPeer, boolean needManuallyStart) throws ConsensusGroupModifyPeerException {
Peer targetPeer, Peer regionMigrationCoordinatorPeer, boolean needManuallyStart)
throws ConsensusGroupModifyPeerException {
try {
KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
consensusPipeManager.createConsensusPipe(thisNode, targetPeer, needManuallyStart);
consensusPipeManager.createConsensusPipe(
thisNode, targetPeer, regionMigrationCoordinatorPeer, needManuallyStart);
peerManager.addAndPersist(targetPeer);
} catch (IOException e) {
LOGGER.warn("{} cannot persist peer {}", thisNode, targetPeer, e);
Expand Down Expand Up @@ -535,34 +549,56 @@ public void startOtherConsensusPipesToTargetPeer(Peer targetPeer)
}

/** Wait for the user written data up to firstCheck to be replicated */
public void waitPeersToTargetPeerTransmissionCompleted(Peer targetPeer)
throws ConsensusGroupModifyPeerException {
public void waitPeersToTargetPeerTransmissionCompleted(
Peer targetPeer, boolean checkForCoordinator) throws ConsensusGroupModifyPeerException {
boolean isTransmissionCompleted = false;
boolean isFirstCheckForCurrentPeer = true;
boolean isFirstCheckForOtherPeers = true;

try {
while (!isTransmissionCompleted) {
Thread.sleep(CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS);

if (isConsensusPipesTransmissionCompleted(
Collections.singletonList(new ConsensusPipeName(thisNode, targetPeer).toString()),
isFirstCheckForCurrentPeer)) {
if (checkForCoordinator) {
ConsensusPipeName consensusPipeName = new ConsensusPipeName(thisNode, targetPeer);
while (!isTransmissionCompleted) {
// Only wait coordinator to transfer snapshot instead of waiting all peers completing data
// transfer. Keep consistent with IoTV1.
isTransmissionCompleted =
isConsensusPipesTransmissionCompleted(
Collections.singletonList(consensusPipeName.toString()),
isFirstCheckForCurrentPeer,
true);

isFirstCheckForCurrentPeer = false;

Thread.sleep(CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS);
}
// Reset coordinator's processor filter switch
ConsensusPipeManager.getProcessorFilterSwitch().put(consensusPipeName, true);
} else {
while (!isTransmissionCompleted) {
final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);

isTransmissionCompleted = true;
for (Peer peer : otherPeers) {
if (!peer.equals(targetPeer)) {
isTransmissionCompleted &=
isRemotePeerConsensusPipesTransmissionCompleted(
peer,
Collections.singletonList(new ConsensusPipeName(peer, targetPeer).toString()),
isFirstCheckForOtherPeers);
try {
isTransmissionCompleted &=
isRemotePeerConsensusPipesTransmissionCompleted(
peer,
Collections.singletonList(
new ConsensusPipeName(peer, targetPeer).toString()),
isFirstCheckForOtherPeers);
} catch (Exception e) {
LOGGER.warn(
"{} failed to check remote peer{}'s transmission progress, may because this peer has down. Ignore this exception and move on",
thisNode,
peer,
e);
}
}
isFirstCheckForOtherPeers = false;

Thread.sleep(CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS);
}
isFirstCheckForOtherPeers = false;
}
isFirstCheckForCurrentPeer = false;
}
} catch (InterruptedException e) {
LOGGER.warn("{} is interrupted when waiting for transfer completed", thisNode, e);
Expand Down Expand Up @@ -630,21 +666,41 @@ private boolean isRemotePeerConsensusPipesTransmissionCompleted(
}

public synchronized boolean isConsensusPipesTransmissionCompleted(
List<String> consensusPipeNames, boolean refreshCachedProgressIndex) {
List<String> consensusPipeNames,
boolean refreshCachedProgressIndex,
boolean checkForCoordinator) {
if (refreshCachedProgressIndex) {
cachedProgressIndex =
cachedProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(
progressIndexManager.getMaxAssignedProgressIndex(thisNode.getGroupId()));

cachedReplicateProgressIndex =
cachedReplicateProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(
progressIndexManager.getMaxReplicatedProgressIndex(thisNode.getGroupId()));
}

try {
return consensusPipeNames.stream()
.noneMatch(
name ->
cachedProgressIndex.isAfter(
progressIndexManager.getProgressIndex(new ConsensusPipeName(name))));
if (checkForCoordinator) {
// check all types of write(user local write and replica write), because coordinator will
// transfer data regardless write type.
return consensusPipeNames.stream()
.allMatch(
name -> {
ProgressIndex currentProgressIndex =
progressIndexManager.getProgressIndex(new ConsensusPipeName(name));
return currentProgressIndex.isAfter(cachedProgressIndex)
&& currentProgressIndex.isAfter(cachedReplicateProgressIndex);
});
} else {
// only check user local write progressIndex
return consensusPipeNames.stream()
.noneMatch(
name ->
cachedProgressIndex.isAfter(
progressIndexManager.getProgressIndex(new ConsensusPipeName(name))));
}
} catch (PipeException e) {
LOGGER.info(e.getMessage());
LOGGER.warn(e.getMessage(), e);
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.commons.lang3.tuple.Triple;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_CONSENSUS_PIPE_NAME;
Expand All @@ -41,15 +42,19 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CAPTURE_TREE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_GROUP_ID_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_INCLUSION_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_CONSENSUS_PIPE_NAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_KEY;

public class ConsensusPipeManager {
// Extract data.insert and data.delete to support deletion.
private static final String CONSENSUS_EXTRACTOR_INCLUSION_VALUE = "data";
private static final Map<ConsensusPipeName, Boolean> processorFilterSwitchMap =
new ConcurrentHashMap<>();
private final PipeConsensusConfig.Pipe config;
private final ReplicateMode replicateMode;
private final ConsensusPipeDispatcher dispatcher;
Expand All @@ -65,10 +70,11 @@ public ConsensusPipeManager(PipeConsensusConfig.Pipe config, ReplicateMode repli
/** This method is used except region migration. */
public void createConsensusPipe(Peer senderPeer, Peer receiverPeer) throws Exception {
ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer, receiverPeer);
processorFilterSwitchMap.put(consensusPipeName, true);
// The third parameter is only used when region migration. Since this method is not called by
// region migration, just pass senderPeer in to get the correct result.
Triple<ImmutableMap<String, String>, ImmutableMap<String, String>, ImmutableMap<String, String>>
params = buildPipeParams(senderPeer, receiverPeer);
params = buildPipeParams(senderPeer, receiverPeer, senderPeer);
dispatcher.createPipe(
consensusPipeName.toString(),
params.getLeft(),
Expand All @@ -78,11 +84,18 @@ public void createConsensusPipe(Peer senderPeer, Peer receiverPeer) throws Excep
}

/** This method is used when executing region migration */
public void createConsensusPipe(Peer senderPeer, Peer receiverPeer, boolean needManuallyStart)
public void createConsensusPipe(
Peer senderPeer,
Peer receiverPeer,
Peer regionMigrationCoordinatorPeer,
boolean needManuallyStart)
throws Exception {
ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer, receiverPeer);
// coordinator which doesn't need manually start turns off the filter switch
processorFilterSwitchMap.put(consensusPipeName, needManuallyStart);

Triple<ImmutableMap<String, String>, ImmutableMap<String, String>, ImmutableMap<String, String>>
params = buildPipeParams(senderPeer, receiverPeer);
params = buildPipeParams(senderPeer, receiverPeer, regionMigrationCoordinatorPeer);
dispatcher.createPipe(
consensusPipeName.toString(),
params.getLeft(),
Expand All @@ -93,9 +106,10 @@ public void createConsensusPipe(Peer senderPeer, Peer receiverPeer, boolean need

public Triple<
ImmutableMap<String, String>, ImmutableMap<String, String>, ImmutableMap<String, String>>
buildPipeParams(Peer senderPeer, Peer receiverPeer) {
buildPipeParams(Peer senderPeer, Peer receiverPeer, Peer coordinatorPeer) {
ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer, receiverPeer);
return new ImmutableTriple<>(

ImmutableMap<String, String> basicExtractorParams =
ImmutableMap.<String, String>builder()
.put(EXTRACTOR_KEY, config.getExtractorPluginName())
.put(EXTRACTOR_INCLUSION_KEY, CONSENSUS_EXTRACTOR_INCLUSION_VALUE)
Expand All @@ -111,9 +125,24 @@ public void createConsensusPipe(Peer senderPeer, Peer receiverPeer, boolean need
.put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode.getValue())
.put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true))
.put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true))
.build(),
.build();
ImmutableMap<String, String> extractorParams;
if (senderPeer.equals(coordinatorPeer)) {
extractorParams = basicExtractorParams;
} else {
extractorParams =
ImmutableMap.<String, String>builder()
.putAll(basicExtractorParams)
.put(
EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY,
String.valueOf(new ConsensusPipeName(senderPeer, coordinatorPeer)))
.build();
}
return new ImmutableTriple<>(
extractorParams,
ImmutableMap.<String, String>builder()
.put(PROCESSOR_KEY, config.getProcessorPluginName())
.put(PROCESSOR_CONSENSUS_PIPE_NAME_KEY, consensusPipeName.toString())
.build(),
ImmutableMap.<String, String>builder()
.put(CONNECTOR_KEY, config.getConnectorPluginName())
Expand All @@ -131,6 +160,7 @@ public void createConsensusPipe(Peer senderPeer, Peer receiverPeer, boolean need
public void dropConsensusPipe(Peer senderPeer, Peer receiverPeer) throws Exception {
ConsensusPipeName consensusPipeName = new ConsensusPipeName(senderPeer, receiverPeer);
dispatcher.dropPipe(consensusPipeName);
processorFilterSwitchMap.remove(consensusPipeName);
}

public void updateConsensusPipe(ConsensusPipeName consensusPipeName, PipeStatus pipeStatus)
Expand All @@ -141,6 +171,7 @@ public void updateConsensusPipe(ConsensusPipeName consensusPipeName, PipeStatus
dispatcher.stopPipe(consensusPipeName.toString());
} else if (PipeStatus.DROPPED.equals(pipeStatus)) {
dispatcher.dropPipe(consensusPipeName);
processorFilterSwitchMap.remove(consensusPipeName);
} else {
throw new IllegalArgumentException("Unsupported pipe status: " + pipeStatus);
}
Expand All @@ -149,4 +180,8 @@ public void updateConsensusPipe(ConsensusPipeName consensusPipeName, PipeStatus
public Map<ConsensusPipeName, PipeStatus> getAllConsensusPipe() {
return selector.getAllConsensusPipe();
}

public static Map<ConsensusPipeName, Boolean> getProcessorFilterSwitch() {
return processorFilterSwitchMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ public interface ProgressIndexManager {
ProgressIndex assignProgressIndex(ConsensusGroupId consensusGroupId);

ProgressIndex getMaxAssignedProgressIndex(ConsensusGroupId consensusGroupId);

void recordPeerMaxProgressIndex(ConsensusGroupId consensusGroupId, ProgressIndex progressIndex);

ProgressIndex getMaxReplicatedProgressIndex(ConsensusGroupId consensusGroupId);
}
Loading
Loading