Skip to content

Commit

Permalink
IGNITE-23783 Add more comments and checks to the test
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill Sizov committed Nov 29, 2024
1 parent aa1fdb7 commit fd84b82
Showing 1 changed file with 174 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
Expand All @@ -58,14 +58,14 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.TestWrappers;
import org.apache.ignite.internal.app.IgniteImpl;
Expand Down Expand Up @@ -99,8 +99,12 @@
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.entity.LogId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.TimeoutNowRequest;
import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
import org.apache.ignite.table.KeyValueView;
Expand Down Expand Up @@ -362,6 +366,8 @@ void testManualRebalanceIfPartitionIsLost() throws Exception {
@ZoneParams(nodes = 6, replicas = 3, partitions = 1)
public void testIncompleteRebalanceAfterResetPartitions() throws Exception {
int partId = 0;
// Make sure the event of nodes 4 and 5 leaving will be triggered once with two of them going out (single scele-down event).
executeSql(format("ALTER ZONE %s SET data_nodes_auto_adjust_scale_down=%d", zoneName, 300));

IgniteImpl node0 = unwrapIgniteImpl(cluster.node(0));

Expand All @@ -373,54 +379,121 @@ public void testIncompleteRebalanceAfterResetPartitions() throws Exception {
awaitPrimaryReplica(node0, partId);
assertRealAssignments(node0, partId, 1, 4, 5);

// First, insert data on nodes 1, 4 and 5.
insertValues(table, partId, 0);

triggerRaftSnapshot(1, partId);
// Second snapshot causes log truncation.
// Second snapshot causes log truncation so that the updates will be installed with a snapshot rather than with a raft log.
triggerRaftSnapshot(1, partId);

unwrapIgniteImpl(node(1)).dropMessages((nodeName, msg) -> {
Ignite node = nullableNode(3);

return node != null && node.name().equals(nodeName) && msg instanceof SnapshotMvDataResponse;
});
// Extract node to a final string to avoid NPE in the test later when node(i) is stopped and set to null.
String node0Name = node(0).name();
String node1Name = node(1).name();
String node3Name = node(3).name();
String node4Name = node(4).name();
String node5Name = node(5).name();
// We block data transfer from 1 to 3 in the [0, 1, 3] group. Data transfer from 1 to 0 will work.
// As mentioned previously, data transfer is performed via raft snapshots only, not via raft log as we called triggerRaftSnapshot.
// Node 1 will not respond with SnapshotMvDataResponse to node 3.
// Also disable TimeoutNowRequest from node 1 to nodes 0 and 3 to avoid triggering new leader election in the group of [0,1,3]
// after 1 had left.
unwrapIgniteImpl(node(1))
.dropMessages((nodeName, msg) ->
(node3Name.equals(nodeName) && ((msg instanceof SnapshotMvDataResponse) || (msg instanceof TimeoutNowRequest)))
|| (node0Name.equals(nodeName) && (msg instanceof TimeoutNowRequest))
);

stopNodesInParallel(4, 5);

// Restore scale down timer to the original value.
executeSql(format("ALTER ZONE %s SET data_nodes_auto_adjust_scale_down=%d", zoneName, 2));

// Scale down triggers and we get 4 nodes.
waitForScale(node0, 4);

// Now that nodes 4 and 5 are stopped, scale down timer triggers new assignments. We should have 0, 1 and 3 in pending.
assertRealAssignments(node0, partId, 0, 1, 3);

Assignments assignment013 = Assignments.of(timestamp,
Assignment.forPeer(node(0).name()),
Assignment.forPeer(node(1).name()),
Assignment.forPeer(node(3).name())
);
assertPendingAssignments(node0, partId, assignment013);

assertRaftPartitionState(node0, partId, 1, LocalPartitionStateEnum.HEALTHY);
// Nodes 0 and 3 did not receive any data yet, hence INITIALIZING state.

// INITIALIZING meant node without log didn't process anything yet,
// it's not really "healthy" before it accepts leader's configuration:
// if (lastLogIndex == 0) {
// localState = INITIALIZING;
// }
assertRaftPartitionState(node0, partId, 0, LocalPartitionStateEnum.INITIALIZING);
assertRaftPartitionState(node0, partId, 3, LocalPartitionStateEnum.INITIALIZING);

// But we block stable switch to 0, 1 and 3. Stable switch to any other set of nodes will work.
blockRebalanceStableSwitch(partId, assignment013);

// Reset produces
// pending = [1, force]
// planned = [0, 1, 3]
CompletableFuture<Void> resetFuture = node0.disasterRecoveryManager().resetAllPartitions(zoneName, QUALIFIED_TABLE_NAME, true);
assertThat(resetFuture, willCompleteSuccessfully());

waitForPartitionState(node0, partId, GlobalPartitionStateEnum.DEGRADED);
// Partition is in DEGRADED state means we have only 2 healthy nodes out of 3 - node 1 and node 0.
waitForPartitionState(node0, partId, GlobalPartitionStateEnum.DEGRADED, 0, 1, 3);

var localStatesFut = node0.disasterRecoveryManager().localPartitionStates(emptySet(), Set.of(node(3).name()), emptySet());
assertThat(localStatesFut, willCompleteSuccessfully());
// Node 3 will be in INSTALLING_SNAPSHOT (from node 1).
assertRaftPartitionState(node0, partId, 3, LocalPartitionStateEnum.INSTALLING_SNAPSHOT);

Assignments assignment1 = Assignments.of(timestamp,
Assignment.forPeer(node(1).name())
);
// After reset, stable assignments will have [1], pending = [0, 1, 3].
assertStableAssignments(node0, partId, assignment1);
assertPendingAssignments(node0, partId, assignment013);

Map<TablePartitionId, LocalPartitionStateByNode> localStates = localStatesFut.join();
assertThat(localStates, is(not(anEmptyMap())));
LocalPartitionStateByNode localPartitionStateByNode = localStates.get(new TablePartitionId(tableId, partId));
// Make sure node 0 has the same data as node 1.
assertTrue(
waitForCondition(() -> getRaftLogIndex(0, partId).equals(getRaftLogIndex(1, partId)), SECONDS.toMillis(20)),
() -> "Node 0 log index = " + getRaftLogIndex(0, partId) + " node 1 log index= " + getRaftLogIndex(1, partId)
);

assertEquals(LocalPartitionStateEnum.INSTALLING_SNAPSHOT, localPartitionStateByNode.values().iterator().next().state);
// Nodes 1 and 0 have the configuration of [0,1,3], node 3 stays on the old one [1,4,5].
assertThat(getRaftNode(0, partId).getCurrentConf().getPeers(), containsInAnyOrder(
new PeerId(node0Name),
new PeerId(node1Name),
new PeerId(node3Name)
));
assertThat(getRaftNode(1, partId).getCurrentConf().getPeers(), containsInAnyOrder(
new PeerId(node0Name),
new PeerId(node1Name),
new PeerId(node3Name)
));
assertThat(getRaftNode(3, partId).getCurrentConf().getPeers(), containsInAnyOrder(
new PeerId(node1Name),
new PeerId(node4Name),
new PeerId(node5Name)
));

stopNode(1);
waitForScale(node0, 3);

waitForPartitionState(node0, partId, GlobalPartitionStateEnum.DEGRADED);
// When node 1 is down, the other two nodes of [0,1,3] try to start leader election.
// Node 0 sends PreVote to node 3, but node 3 is still on the old configuration that does not include 0,
// thus node 3 rejects 0's prevote.
// Node 3 itself will be in INSTALLING_SNAPSHOT (from node 1), thus it is unable to send PreVote messages.
// Node 2 is not in the group at all.
waitForPartitionState(node0, partId, GlobalPartitionStateEnum.READ_ONLY, 0, 2, 3);

// Reset produces
// pending = [0, force]
// planned = [0, 2, 3]
resetFuture = node0.disasterRecoveryManager().resetAllPartitions(zoneName, QUALIFIED_TABLE_NAME, true);
assertThat(resetFuture, willCompleteSuccessfully());

waitForPartitionState(node0, partId, GlobalPartitionStateEnum.AVAILABLE);
waitForPartitionState(node0, partId, GlobalPartitionStateEnum.AVAILABLE, 0, 2, 3);

awaitPrimaryReplica(node0, partId);
assertRealAssignments(node0, partId, 0, 2, 3);
Expand All @@ -444,6 +517,7 @@ public void testIncompleteRebalanceAfterResetPartitions() throws Exception {
});
}


/**
* Tests that in a situation from the test {@link #testInsertFailsIfMajorityIsLost()} it is possible to recover partition using a
* disaster recovery API, but with manual flag set to false. We expect that in this replica factor won't be restored.
Expand Down Expand Up @@ -988,24 +1062,37 @@ private boolean stableKeySwitchMessage(NetworkMessage msg, int partId, Assignmen
return false;
}

private void waitForPartitionState(IgniteImpl node0, int partId, GlobalPartitionStateEnum expectedState) throws InterruptedException {
private void waitForPartitionState(IgniteImpl node0, int partId, GlobalPartitionStateEnum expectedState, int... nodeIndexes)
throws InterruptedException {
assertTrue(waitForCondition(() -> {
CompletableFuture<Map<TablePartitionId, GlobalPartitionState>> statesFuture = node0.disasterRecoveryManager()
.globalPartitionStates(Set.of(zoneName), emptySet());
CompletableFuture<Map<TablePartitionId, GlobalPartitionState>> statesFuture = node0.disasterRecoveryManager()
.globalPartitionStates(Set.of(zoneName), emptySet());

assertThat(statesFuture, willCompleteSuccessfully());
assertThat(statesFuture, willCompleteSuccessfully());

Map<TablePartitionId, GlobalPartitionState> map = statesFuture.join();
Map<TablePartitionId, GlobalPartitionState> map = statesFuture.join();

GlobalPartitionState state = map.get(new TablePartitionId(tableId, partId));
GlobalPartitionState state = map.get(new TablePartitionId(tableId, partId));

return state != null && state.state == expectedState;
}, 500, 20_000),
return state != null && state.state == expectedState;
},
500,
SECONDS.toMillis(20)
),
() -> "Expected state: " + expectedState
+ ", actual: " + node0.disasterRecoveryManager().globalPartitionStates(Set.of(zoneName), emptySet()).join()
+ ",\n nodes states: \n" + nodeStates(node0, partId, nodeIndexes)
);
}

private String nodeStates(IgniteImpl node, int partId, int... nodeIndexes) {
return IntStream.of(nodeIndexes)
.mapToObj(index -> raftPartitionStateOnNode(node, index).get(new TablePartitionId(tableId, partId)))
.filter(Objects::nonNull)
.map(Object::toString)
.collect(Collectors.joining(",\n"));
}

private String findLeader(int nodeIdx, int partId) {
IgniteImpl node = unwrapIgniteImpl(node(nodeIdx));

Expand All @@ -1017,8 +1104,23 @@ private String findLeader(int nodeIdx, int partId) {
return raftGroupService.getRaftNode().getLeaderId().getConsistentId();
}

private NodeImpl getRaftNode(int nodeIdx, int partId) {
IgniteImpl node = unwrapIgniteImpl(node(nodeIdx));

var raftNodeId = new RaftNodeId(new TablePartitionId(tableId, partId), new Peer(node.name()));
var jraftServer = (JraftServerImpl) node.raftManager().server();

RaftGroupService raftGroupService = jraftServer.raftGroupService(raftNodeId);
assertNotNull(raftGroupService);

return (NodeImpl) raftGroupService.getRaftNode();
}

private LogId getRaftLogIndex(int nodeIdx, int partId) {
return getRaftNode(nodeIdx, partId).lastLogIndexAndTerm();
}

private void triggerRaftSnapshot(int nodeIdx, int partId) throws InterruptedException, ExecutionException {
//noinspection resource
IgniteImpl node = unwrapIgniteImpl(node(nodeIdx));

var raftNodeId = new RaftNodeId(new TablePartitionId(tableId, partId), new Peer(node.name()));
Expand All @@ -1034,6 +1136,42 @@ private void triggerRaftSnapshot(int nodeIdx, int partId) throws InterruptedExce
assertEquals(RaftError.SUCCESS, fut.get().getRaftError());
}

private void assertRaftPartitionState(
IgniteImpl node,
int partId,
int targetNodeIdx,
@Nullable LocalPartitionStateEnum expected
) throws InterruptedException {
assertTrue(waitForCondition(() -> {
Map<TablePartitionId, LocalPartitionStateByNode> states = raftPartitionStateOnNode(node, targetNodeIdx);

if (expected == null) {
return states.isEmpty();
}

if (states.isEmpty()) {
return false;
}

LocalPartitionStateByNode localPartitionStateByNode = states.get(new TablePartitionId(tableId, partId));

return expected == localPartitionStateByNode.values().iterator().next().state;
},
250,
SECONDS.toMillis(20)
),
() -> "Expected state: " + expected + ", actual: " + raftPartitionStateOnNode(node, targetNodeIdx)
);
}

private Map<TablePartitionId, LocalPartitionStateByNode> raftPartitionStateOnNode(IgniteImpl node, int targetNodeIdx) {
var localStatesFut =
node.disasterRecoveryManager().localPartitionStates(emptySet(), Set.of(node(targetNodeIdx).name()), emptySet());
assertThat(localStatesFut, willCompleteSuccessfully());

return localStatesFut.join();
}

private void awaitPrimaryReplica(IgniteImpl node0, int partId) {
CompletableFuture<ReplicaMeta> awaitPrimaryReplicaFuture = node0.placementDriver()
.awaitPrimaryReplica(new TablePartitionId(tableId, partId), node0.clock().now(), 60, SECONDS);
Expand All @@ -1049,8 +1187,12 @@ private void assertRealAssignments(IgniteImpl node0, int partId, Integer... expe
}

private void assertPendingAssignments(IgniteImpl node0, int partId, Assignments expected) throws InterruptedException {
assertPendingAssignments(node0, partId, expected, 2000);
}

private void assertPendingAssignments(IgniteImpl node0, int partId, Assignments expected, long timeout) throws InterruptedException {
assertTrue(
waitForCondition(() -> expected.equals(getPendingAssignments(node0, partId)), 2000),
waitForCondition(() -> expected.equals(getPendingAssignments(node0, partId)), timeout),
() -> "Expected: " + expected + ", actual: " + getPendingAssignments(node0, partId)
);
}
Expand All @@ -1063,8 +1205,12 @@ private void assertPlannedAssignments(IgniteImpl node0, int partId, Assignments
}

private void assertStableAssignments(IgniteImpl node0, int partId, Assignments expected) throws InterruptedException {
assertStableAssignments(node0, partId, expected, 2000);
}

private void assertStableAssignments(IgniteImpl node0, int partId, Assignments expected, long timeout) throws InterruptedException {
assertTrue(
waitForCondition(() -> expected.equals(getStableAssignments(node0, partId)), 2000),
waitForCondition(() -> expected.equals(getStableAssignments(node0, partId)), timeout),
() -> "Expected: " + expected + ", actual: " + getStableAssignments(node0, partId)
);
}
Expand Down

0 comments on commit fd84b82

Please sign in to comment.