Skip to content

Commit

Permalink
IGNITE-23783 Add more comments and checks to the test
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill Sizov committed Nov 29, 2024
1 parent 05c8a32 commit 47b7b15
Showing 1 changed file with 46 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -98,9 +99,12 @@
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.entity.LogId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.TimeoutNowRequest;
import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
import org.apache.ignite.table.KeyValueView;
Expand Down Expand Up @@ -382,12 +386,22 @@ public void testIncompleteRebalanceAfterResetPartitions() throws Exception {
// Second snapshot causes log truncation so that the updates will be installed with a snapshot rather than with a raft log.
triggerRaftSnapshot(1, partId);

// Extract node to a final string to avoid NPE in the test when dropMessages is called via a race on shutdown - node() returns null.
// Extract node to a final string to avoid NPE in the test later when node(i) is stopped and set to null.
String node0Name = node(0).name();
String node1Name = node(1).name();
String node3Name = node(3).name();
String node4Name = node(4).name();
String node5Name = node(5).name();
// We block data transfer from 1 to 3 in the [0, 1, 3] group. Data transfer from 1 to 0 will work.
// As mentioned previously, data transfer is performed via raft snapshots only, not via raft log as we called triggerRaftSnapshot.
// Node 1 will not respond with SnapshotMvDataResponse to node 3.
unwrapIgniteImpl(node(1)).dropMessages((nodeName, msg) -> node3Name.equals(nodeName) && msg instanceof SnapshotMvDataResponse);
// Also disable TimeoutNowRequest from node 1 to nodes 0 and 3 to avoid triggering new leader election in the group of [0,1,3]
// after 1 had left.
unwrapIgniteImpl(node(1))
.dropMessages((nodeName, msg) ->
(node3Name.equals(nodeName) && ((msg instanceof SnapshotMvDataResponse) || (msg instanceof TimeoutNowRequest)))
|| (node0Name.equals(nodeName) && (msg instanceof TimeoutNowRequest))
);

stopNodesInParallel(4, 5);

Expand Down Expand Up @@ -428,7 +442,7 @@ public void testIncompleteRebalanceAfterResetPartitions() throws Exception {
assertThat(resetFuture, willCompleteSuccessfully());

// Partition is in DEGRADED state means we have only 2 healthy nodes out of 3 - node 1 and node 0.
waitForPartitionState(node0, partId, GlobalPartitionStateEnum.DEGRADED);
waitForPartitionState(node0, partId, GlobalPartitionStateEnum.DEGRADED, 0, 1, 3);

// Node 3 will be in INSTALLING_SNAPSHOT (from node 1).
assertRaftPartitionState(node0, partId, 3, LocalPartitionStateEnum.INSTALLING_SNAPSHOT);
Expand All @@ -446,13 +460,32 @@ public void testIncompleteRebalanceAfterResetPartitions() throws Exception {
() -> "Node 0 log index = " + getRaftLogIndex(0, partId) + " node 1 log index= " + getRaftLogIndex(1, partId)
);

// Nodes 1 and 0 have the configuration of [0,1,3], node 3 stays on the old one [1,4,5].
assertThat(getRaftNode(0, partId).getCurrentConf().getPeers(), containsInAnyOrder(
new PeerId(node0Name),
new PeerId(node1Name),
new PeerId(node3Name)
));
assertThat(getRaftNode(1, partId).getCurrentConf().getPeers(), containsInAnyOrder(
new PeerId(node0Name),
new PeerId(node1Name),
new PeerId(node3Name)
));
assertThat(getRaftNode(3, partId).getCurrentConf().getPeers(), containsInAnyOrder(
new PeerId(node1Name),
new PeerId(node4Name),
new PeerId(node5Name)
));

stopNode(1);
waitForScale(node0, 3);

// Node 3 will be in INSTALLING_SNAPSHOT (from node 1) until it is cancelled by installing a new snapshot from node 0.
// We expect that node 0 will start a new election and elect itself as a leader.
// Then node 0 will install a snapshot to node 3.
waitForPartitionState(node0, partId, GlobalPartitionStateEnum.DEGRADED, 0, 2, 3);
// When node 1 is down, the other two nodes of [0,1,3] try to start leader election.
// Node 0 sends PreVote to node 3, but node 3 is still on the old configuration that does not include 0,
// thus node 3 rejects 0's prevote.
// Node 3 itself will be in INSTALLING_SNAPSHOT (from node 1), thus it is unable to send PreVote messages.
// Node 2 is not in the group at all.
waitForPartitionState(node0, partId, GlobalPartitionStateEnum.READ_ONLY, 0, 2, 3);

// Reset produces
// pending = [0, force]
Expand Down Expand Up @@ -1068,7 +1101,7 @@ private String findLeader(int nodeIdx, int partId) {
return raftGroupService.getRaftNode().getLeaderId().getConsistentId();
}

private LogId getRaftLogIndex(int nodeIdx, int partId) {
private NodeImpl getRaftNode(int nodeIdx, int partId) {
IgniteImpl node = unwrapIgniteImpl(node(nodeIdx));

var raftNodeId = new RaftNodeId(new TablePartitionId(tableId, partId), new Peer(node.name()));
Expand All @@ -1077,7 +1110,11 @@ private LogId getRaftLogIndex(int nodeIdx, int partId) {
RaftGroupService raftGroupService = jraftServer.raftGroupService(raftNodeId);
assertNotNull(raftGroupService);

return raftGroupService.getRaftNode().lastLogIndexAndTerm();
return (NodeImpl) raftGroupService.getRaftNode();
}

private LogId getRaftLogIndex(int nodeIdx, int partId) {
return getRaftNode(nodeIdx, partId).lastLogIndexAndTerm();
}

private void triggerRaftSnapshot(int nodeIdx, int partId) throws InterruptedException, ExecutionException {
Expand Down

0 comments on commit 47b7b15

Please sign in to comment.