Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[to dev/1.3][region migration] Make peer list correct when IoTConsensus & IoTConsensusV2 & Ratis starting #14535

Merged
merged 3 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
Expand Down Expand Up @@ -360,11 +359,8 @@ public TDataNodeRestartResp updateDataNodeIfNecessary(TDataNodeRestartReq req) {

resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
resp.setRuntimeConfiguration(getRuntimeConfiguration());
List<TConsensusGroupId> consensusGroupIds =
getPartitionManager().getAllReplicaSets(nodeId).stream()
.map(TRegionReplicaSet::getRegionId)
.collect(Collectors.toList());
resp.setConsensusGroupIds(consensusGroupIds);

resp.setCorrectConsensusGroups(getPartitionManager().getAllReplicaSets(nodeId));
return resp;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
import java.util.stream.StreamSupport;

import static org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR;
import static org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_ALREADY_EXIST_MSG;
import static org.apache.iotdb.commons.pipe.config.constant.PipeRPCMessageConstant.PIPE_NOT_EXIST_MSG;

public class PipeTaskInfo implements SnapshotProcessor {

Expand Down Expand Up @@ -179,8 +181,8 @@ private boolean checkBeforeCreatePipeInternal(final TCreatePipeReq createPipeReq

final String exceptionMessage =
String.format(
"Failed to create pipe %s, the pipe with the same name has been created",
createPipeRequest.getPipeName());
"Failed to create pipe %s, %s",
createPipeRequest.getPipeName(), PIPE_ALREADY_EXIST_MSG);
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
Expand All @@ -204,7 +206,7 @@ private boolean checkAndUpdateRequestBeforeAlterPipeInternal(final TAlterPipeReq

final String exceptionMessage =
String.format(
"Failed to alter pipe %s, the pipe does not exist", alterPipeRequest.getPipeName());
"Failed to alter pipe %s, %s", alterPipeRequest.getPipeName(), PIPE_NOT_EXIST_MSG);
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
Expand Down Expand Up @@ -281,7 +283,7 @@ public void checkBeforeStartPipe(final String pipeName) throws PipeException {
private void checkBeforeStartPipeInternal(final String pipeName) throws PipeException {
if (!isPipeExisted(pipeName)) {
final String exceptionMessage =
String.format("Failed to start pipe %s, the pipe does not exist", pipeName);
String.format("Failed to start pipe %s, %s", pipeName, PIPE_NOT_EXIST_MSG);
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
Expand All @@ -307,7 +309,7 @@ public void checkBeforeStopPipe(final String pipeName) throws PipeException {
private void checkBeforeStopPipeInternal(final String pipeName) throws PipeException {
if (!isPipeExisted(pipeName)) {
final String exceptionMessage =
String.format("Failed to stop pipe %s, the pipe does not exist", pipeName);
String.format("Failed to stop pipe %s, %s", pipeName, PIPE_NOT_EXIST_MSG);
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;

/** Consensus module base interface. */
@ThreadSafe
Expand Down Expand Up @@ -145,6 +146,15 @@ public interface IConsensus {
*/
void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException;

/**
* Record the correct peer list (likely got from the ConfigNode) for future use in resetPeerList.
* Only use this method if necessary. If it is called, it should be called before {@link
* #start()}.
*
* @param correctPeerList The correct consensus group member list
*/
void recordCorrectPeerListBeforeStarting(Map<ConsensusGroupId, List<Peer>> correctPeerList);

/**
* Reset the peer list of the corresponding consensus group. Currently only used in the automatic
* cleanup of region migration as a rollback for {@link #addRemotePeer(ConsensusGroupId, Peer)},
Expand Down Expand Up @@ -226,17 +236,6 @@ public interface IConsensus {
*/
List<ConsensusGroupId> getAllConsensusGroupIds();

/**
* Return all consensus group ids from disk.
*
* <p>We need to parse all the RegionGroupIds from the disk directory before starting the
* consensus layer, and {@link #getAllConsensusGroupIds()} returns an empty list, so we need to
* add a new interface.
*
* @return consensusGroupId list
*/
List<ConsensusGroupId> getAllConsensusGroupIdsWithoutStarting();

/**
* Return the region directory of the corresponding consensus group.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

public class IoTConsensus implements IConsensus {

Expand All @@ -99,6 +101,7 @@ public class IoTConsensus implements IConsensus {
private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
private final ScheduledExecutorService backgroundTaskService;
private Future<?> updateReaderFuture;
private Map<ConsensusGroupId, List<Peer>> correctPeerListBeforeStart = null;

public IoTConsensus(ConsensusConfig config, Registry registry) {
this.thisNode = config.getThisNodeEndPoint();
Expand Down Expand Up @@ -178,10 +181,32 @@ private void initAndRecover() throws IOException {
syncClientManager,
config);
stateMachineMap.put(consensusGroupId, consensus);
consensus.start();
}
}
}
if (correctPeerListBeforeStart != null) {
BiConsumer<ConsensusGroupId, List<Peer>> resetPeerListWithoutThrow =
(consensusGroupId, peers) -> {
try {
resetPeerList(consensusGroupId, peers);
} catch (ConsensusGroupNotExistException ignore) {

} catch (Exception e) {
logger.warn("Failed to reset peer list while start", e);
}
};
// make peers which are in list correct
correctPeerListBeforeStart.forEach(resetPeerListWithoutThrow);
// clear peers which are not in the list
stateMachineMap.keySet().stream()
.filter(consensusGroupId -> !correctPeerListBeforeStart.containsKey(consensusGroupId))
// copy to a new list to avoid concurrent modification
.collect(Collectors.toList())
.forEach(
consensusGroupId ->
resetPeerListWithoutThrow.accept(consensusGroupId, Collections.emptyList()));
}
stateMachineMap.values().forEach(IoTConsensusServerImpl::start);
}

@Override
Expand Down Expand Up @@ -435,36 +460,6 @@ public List<ConsensusGroupId> getAllConsensusGroupIds() {
return new ArrayList<>(stateMachineMap.keySet());
}

@Override
public List<ConsensusGroupId> getAllConsensusGroupIdsWithoutStarting() {
return getConsensusGroupIdsFromDir(storageDir, logger);
}

public static List<ConsensusGroupId> getConsensusGroupIdsFromDir(File storageDir, Logger logger) {
if (!storageDir.exists()) {
return Collections.emptyList();
}
List<ConsensusGroupId> consensusGroupIds = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) {
for (Path path : stream) {
try {
String[] items = path.getFileName().toString().split("_");
ConsensusGroupId consensusGroupId =
ConsensusGroupId.Factory.create(
Integer.parseInt(items[0]), Integer.parseInt(items[1]));
consensusGroupIds.add(consensusGroupId);
} catch (Exception e) {
logger.info(
"The directory {} is not a group directory;" + " ignoring it. ",
path.getFileName().toString());
}
}
} catch (IOException e) {
logger.error("Failed to get all consensus group ids from disk", e);
}
return consensusGroupIds;
}

@Override
public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) {
return buildPeerDir(storageDir, groupId);
Expand All @@ -483,10 +478,16 @@ public void reloadConsensusConfig(ConsensusConfig consensusConfig) {
.init(config.getReplication().getRegionMigrationSpeedLimitBytesPerSecond());
}

@Override
public void recordCorrectPeerListBeforeStarting(
Map<ConsensusGroupId, List<Peer>> correctPeerList) {
logger.info("Record correct peer list: {}", correctPeerList);
this.correctPeerListBeforeStart = correctPeerList;
}

@Override
public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
throws ConsensusException {
logger.info("[RESET PEER LIST] Start to reset peer list to {}", correctPeers);
IoTConsensusServerImpl impl =
Optional.ofNullable(stateMachineMap.get(groupId))
.orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
Expand All @@ -501,27 +502,37 @@ public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
}

synchronized (impl) {
// remove invalid peer
ImmutableList<Peer> currentMembers = ImmutableList.copyOf(impl.getConfiguration());
String previousPeerListStr = currentMembers.toString();
for (Peer peer : currentMembers) {
if (!correctPeers.contains(peer)) {
if (!impl.removeSyncLogChannel(peer)) {
logger.error(
"[RESET PEER LIST] Failed to remove peer {}'s sync log channel from group {}",
peer,
groupId);
logger.error("[RESET PEER LIST] Failed to remove sync channel with: {}", peer);
} else {
logger.info("[RESET PEER LIST] Remove sync channel with: {}", peer);
}
}
}
logger.info(
"[RESET PEER LIST] Local peer list has been reset: {} -> {}",
previousPeerListStr,
impl.getConfiguration());
// add correct peer
for (Peer peer : correctPeers) {
if (!impl.getConfiguration().contains(peer)) {
logger.warn("[RESET PEER LIST] \"Correct peer\" {} is not in local peer list", peer);
impl.buildSyncLogChannel(peer);
logger.info("[RESET PEER LIST] Build sync channel with: {}", peer);
}
}
// show result
String newPeerListStr = impl.getConfiguration().toString();
if (!previousPeerListStr.equals(newPeerListStr)) {
logger.info(
"[RESET PEER LIST] Local peer list has been reset: {} -> {}",
previousPeerListStr,
newPeerListStr);
} else {
logger.info(
"[RESET PEER LIST] The current peer list is correct, nothing need to be reset: {}",
previousPeerListStr);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,17 +645,12 @@ private boolean isSuccess(TSStatus status) {
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
}

/**
* build SyncLog channel with safeIndex as the default initial sync index.
*
* @throws ConsensusGroupModifyPeerException
*/
public void buildSyncLogChannel(Peer targetPeer) throws ConsensusGroupModifyPeerException {
/** build SyncLog channel with safeIndex as the default initial sync index. */
public void buildSyncLogChannel(Peer targetPeer) {
buildSyncLogChannel(targetPeer, getMinSyncIndex());
}

public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex)
throws ConsensusGroupModifyPeerException {
public void buildSyncLogChannel(Peer targetPeer, long initialSyncIndex) {
KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
// step 1, build sync channel in LogDispatcher
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,8 @@ public TBuildSyncLogChannelRes buildSyncLogChannel(TBuildSyncLogChannelReq req)
return new TBuildSyncLogChannelRes(status);
}
TSStatus responseStatus;
try {
impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (ConsensusGroupModifyPeerException e) {
responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
responseStatus.setMessage(e.getMessage());
}
impl.buildSyncLogChannel(new Peer(groupId, req.nodeId, req.endPoint));
responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
return new TBuildSyncLogChannelRes(responseStatus);
}

Expand Down
Loading
Loading