From 8ace698241c8cfdaa18e9e8e3f16b136bf6809a3 Mon Sep 17 00:00:00 2001 From: Cyrill Date: Tue, 24 Dec 2024 18:17:59 +0300 Subject: [PATCH] IGNITE-23879 Indicate stable switch after the second phase of resetPartitions (#4916) --- .../partitiondistribution/Assignments.java | 49 ++++- .../AssignmentsSerializer.java | 4 +- .../AssignmentsSerializerTest.java | 44 +++-- ...ItDisasterRecoveryReconfigurationTest.java | 172 +++++++++++++++++- .../disaster/GroupUpdateRequest.java | 13 +- 5 files changed, 252 insertions(+), 30 deletions(-) diff --git a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java index 4999daed506..1c2c606c9ed 100644 --- a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java +++ b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java @@ -38,7 +38,7 @@ public class Assignments { /** Empty assignments. */ public static final Assignments EMPTY = - new Assignments(Collections.emptySet(), false, HybridTimestamp.NULL_HYBRID_TIMESTAMP); + new Assignments(Collections.emptySet(), false, HybridTimestamp.NULL_HYBRID_TIMESTAMP, false); /** Set of nodes. */ @IgniteToStringInclude @@ -54,15 +54,24 @@ public class Assignments { /** Time when the catalog version that the assignments were calculated against becomes active (i. e. available for use). */ private final long timestamp; + /** + * This assignment was created in the second phase of reset. + * See GroupUpdateRequest javadoc for details. + */ + private final boolean fromReset; + /** * Constructor. */ - private Assignments(Collection nodes, boolean force, long timestamp) { + private Assignments(Collection nodes, boolean force, long timestamp, boolean fromReset) { // A set of nodes must be a HashSet in order for serialization to produce stable results, // that could be compared as byte arrays. this.nodes = nodes instanceof HashSet ? ((HashSet) nodes) : new HashSet<>(nodes); this.force = force; this.timestamp = timestamp; + this.fromReset = fromReset; + + assert !(force && fromReset) : "Only one flag can be set from 'force' and 'fromReset'."; } /** @@ -71,7 +80,16 @@ private Assignments(Collection nodes, boolean force, long timestamp) * @param nodes Set of nodes. */ public static Assignments of(Set nodes, long timestamp) { - return new Assignments(nodes, false, timestamp); + return new Assignments(nodes, false, timestamp, false); + } + + /** + * Creates a new instance. + * + * @param nodes Set of nodes. + */ + public static Assignments of(Set nodes, long timestamp, boolean fromReset) { + return new Assignments(nodes, false, timestamp, fromReset); } /** @@ -80,7 +98,7 @@ public static Assignments of(Set nodes, long timestamp) { * @param nodes Array of nodes. */ public static Assignments of(long timestamp, Assignment... nodes) { - return new Assignments(Arrays.asList(nodes), false, timestamp); + return new Assignments(Arrays.asList(nodes), false, timestamp, false); } /** @@ -90,7 +108,7 @@ public static Assignments of(long timestamp, Assignment... nodes) { * @see #force() */ public static Assignments forced(Set nodes, long timestamp) { - return new Assignments(nodes, true, timestamp); + return new Assignments(nodes, true, timestamp, false); } /** @@ -109,6 +127,13 @@ public boolean force() { return force; } + /** + * Returns {@code true} if this assignment was created in the second phase of reset. + */ + public boolean fromReset() { + return fromReset; + } + /** * Returns a timestamp when the catalog version that the assignments were calculated against becomes active. */ @@ -145,7 +170,16 @@ public byte[] toBytes() { * @see #toBytes() */ public static byte[] toBytes(Set assignments, long timestamp) { - return new Assignments(assignments, false, timestamp).toBytes(); + return new Assignments(assignments, false, timestamp, false).toBytes(); + } + + /** + * Serializes assignments into an array of bytes. + * + * @see #toBytes() + */ + public static byte[] toBytes(Set assignments, long timestamp, boolean fromReset) { + return new Assignments(assignments, false, timestamp, fromReset).toBytes(); } /** @@ -172,13 +206,14 @@ public boolean equals(Object o) { } Assignments that = (Assignments) o; - return force == that.force && nodes.equals(that.nodes); + return force == that.force && nodes.equals(that.nodes) && fromReset == that.fromReset; } @Override public int hashCode() { int result = nodes.hashCode(); result = 31 * result + Boolean.hashCode(force); + result = 31 * result + Boolean.hashCode(fromReset); return result; } diff --git a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializer.java b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializer.java index d5ee560ed66..aafd5acc774 100644 --- a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializer.java +++ b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializer.java @@ -43,6 +43,7 @@ protected void writeExternalData(Assignments assignments, IgniteDataOutput out) out.writeBoolean(assignments.force()); hybridTimestamp(assignments.timestamp()).writeTo(out); + out.writeBoolean(assignments.fromReset()); } private static void writeAssignment(Assignment assignment, IgniteDataOutput out) throws IOException { @@ -55,8 +56,9 @@ protected Assignments readExternalData(byte protoVer, IgniteDataInput in) throws Set nodes = readNodes(in); boolean force = in.readBoolean(); HybridTimestamp timestamp = HybridTimestamp.readFrom(in); + boolean fromReset = in.readBoolean(); - return force ? Assignments.forced(nodes, timestamp.longValue()) : Assignments.of(nodes, timestamp.longValue()); + return force ? Assignments.forced(nodes, timestamp.longValue()) : Assignments.of(nodes, timestamp.longValue(), fromReset); } private static Set readNodes(IgniteDataInput in) throws IOException { diff --git a/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializerTest.java b/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializerTest.java index d1f9cc1cec0..31584785eae 100644 --- a/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializerTest.java +++ b/modules/partition-distribution/src/test/java/org/apache/ignite/internal/partitiondistribution/AssignmentsSerializerTest.java @@ -33,12 +33,13 @@ import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.versioned.VersionedSerialization; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junitpioneer.jupiter.cartesian.CartesianTest; +import org.junitpioneer.jupiter.cartesian.CartesianTest.Values; class AssignmentsSerializerTest { - private static final String NOT_FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1 = "Ae++QwMEYWJjAQRkZWYAAFHCjAEA9AY="; - private static final String FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1 = "Ae++QwMEYWJjAQRkZWYAAVHCjAEA9AY="; + private static final String NOT_FORCED_NOT_RESET_ASSIGNMENTS_SERIALIZED_WITH_V1 = "Ae++QwMEYWJjAQRkZWYAAFHCjAEA9AYA"; + private static final String NOT_FORCED_RESET_ASSIGNMENTS_SERIALIZED_WITH_V1 = "Ae++QwMEYWJjAQRkZWYAAFHCjAEA9AYB"; + private static final String FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1 = "Ae++QwMEYWJjAQRkZWYAAVHCjAEA9AYA"; private final AssignmentsSerializer serializer = new AssignmentsSerializer(); @@ -51,10 +52,12 @@ private static long baseTimestamp(int logical) { return new HybridTimestamp(BASE_PHYSICAL_TIME, logical).longValue(); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void serializationAndDeserialization(boolean force) { - Assignments originalAssignments = testAssignments(force); + @CartesianTest + void serializationAndDeserialization( + @Values(booleans = {true, false}) boolean force, + @Values(booleans = {true, false}) boolean fromReset + ) { + Assignments originalAssignments = testAssignments(force, fromReset); byte[] bytes = VersionedSerialization.toBytes(originalAssignments, serializer); Assignments restoredAssignments = VersionedSerialization.fromBytes(bytes, serializer); @@ -62,22 +65,35 @@ void serializationAndDeserialization(boolean force) { assertThat(restoredAssignments, equalTo(originalAssignments)); } - private static Assignments testAssignments(boolean force) { + private static Assignments testAssignments(boolean force, boolean fromReset) { Set nodes = Set.of(Assignment.forPeer("abc"), Assignment.forLearner("def")); return force ? Assignments.forced(nodes, baseTimestamp(5)) - : Assignments.of(nodes, baseTimestamp(5)); + : Assignments.of(nodes, baseTimestamp(5), fromReset); } @Test - void v1NotForcedCanBeDeserialized() { - byte[] bytes = Base64.getDecoder().decode(NOT_FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1); + void v1NotForcedNotResetCanBeDeserialized() { + byte[] bytes = Base64.getDecoder().decode(NOT_FORCED_NOT_RESET_ASSIGNMENTS_SERIALIZED_WITH_V1); Assignments restoredAssignments = VersionedSerialization.fromBytes(bytes, serializer); assertNodesFromV1(restoredAssignments); assertThat(restoredAssignments.force(), is(false)); + assertThat(restoredAssignments.fromReset(), is(false)); + assertThat(restoredAssignments.timestamp(), is(baseTimestamp(5))); + } + + @Test + void v1NotForcedResetCanBeDeserialized() { + byte[] bytes = Base64.getDecoder().decode(NOT_FORCED_RESET_ASSIGNMENTS_SERIALIZED_WITH_V1); + Assignments restoredAssignments = VersionedSerialization.fromBytes(bytes, serializer); + + assertNodesFromV1(restoredAssignments); + + assertThat(restoredAssignments.force(), is(false)); + assertThat(restoredAssignments.fromReset(), is(true)); assertThat(restoredAssignments.timestamp(), is(baseTimestamp(5))); } @@ -93,8 +109,8 @@ void v1ForcedCanBeDeserialized() { } @SuppressWarnings("unused") - private String v1Base64(boolean force) { - Assignments originalAssignments = testAssignments(force); + private String v1Base64(boolean force, boolean fromReset) { + Assignments originalAssignments = testAssignments(force, fromReset); byte[] v1Bytes = VersionedSerialization.toBytes(originalAssignments, serializer); return Base64.getEncoder().encodeToString(v1Bytes); } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java index 54678dd161d..75b4b23ae3e 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java @@ -245,6 +245,8 @@ void testManualRebalanceIfMajorityIsLost() throws Exception { IgniteImpl node0 = igniteImpl(0); Table table = node0.tables().table(TABLE_NAME); + int catalogVersion = node0.catalogManager().latestCatalogVersion(); + long timestamp = node0.catalogManager().catalog(catalogVersion).time(); awaitPrimaryReplica(node0, partId); @@ -269,6 +271,15 @@ void testManualRebalanceIfMajorityIsLost() throws Exception { List errors = insertValues(table, partId, 0); assertThat(errors, is(empty())); + + // No fromReset flag is set on stable. + Assignments assignmentsStable = Assignments.of(Set.of( + Assignment.forPeer(node(0).name()), + Assignment.forPeer(node(1).name()), + Assignment.forPeer(node(2).name()) + ), timestamp); + + assertStableAssignments(node0, partId, assignmentsStable); } /** @@ -555,6 +566,9 @@ public void testIncompleteRebalanceAfterResetPartitions() throws Exception { blockRebalanceStableSwitch(partId, assignment013); + // Reset produces + // pending = [1, force] + // planned = [0, 1, 3] CompletableFuture resetFuture = node0.disasterRecoveryManager().resetAllPartitions(zoneName, QUALIFIED_TABLE_NAME, true, -1); assertThat(resetFuture, willCompleteSuccessfully()); @@ -569,6 +583,15 @@ public void testIncompleteRebalanceAfterResetPartitions() throws Exception { assertEquals(LocalPartitionStateEnum.INSTALLING_SNAPSHOT, localPartitionStateByNode.values().iterator().next().state); + // fromReset == true, assert force == false. + Assignments assignmentsPending = Assignments.of(Set.of( + Assignment.forPeer(node(0).name()), + Assignment.forPeer(node(1).name()), + Assignment.forPeer(node(3).name()) + ), timestamp, true); + + assertPendingAssignments(node0, partId, assignmentsPending); + stopNode(1); waitForScale(node0, 3); @@ -601,6 +624,130 @@ public void testIncompleteRebalanceAfterResetPartitions() throws Exception { }); } + @Test + @ZoneParams(nodes = 5, replicas = 3, partitions = 1) + public void testNewResetOverwritesFlags() throws Exception { + int partId = 0; + + IgniteImpl node0 = igniteImpl(0); + + int catalogVersion = node0.catalogManager().latestCatalogVersion(); + long timestamp = node0.catalogManager().catalog(catalogVersion).time(); + + awaitPrimaryReplica(node0, partId); + + assertRealAssignments(node0, partId, 0, 1, 4); + + stopNodesInParallel(1, 4); + waitForScale(node0, 3); + + assertRealAssignments(node0, partId, 0, 2, 3); + + Assignments assignments = Assignments.of(timestamp, + Assignment.forPeer(node(0).name()), + Assignment.forPeer(node(2).name()), + Assignment.forPeer(node(3).name()) + ); + + blockRebalanceStableSwitch(partId, assignments); + + // Reset produces + // pending = [0, force] + // planned = [0, 2, 3] + CompletableFuture resetFuture = node0.disasterRecoveryManager().resetAllPartitions(zoneName, QUALIFIED_TABLE_NAME, true, -1); + assertThat(resetFuture, willCompleteSuccessfully()); + + waitForPartitionState(node0, partId, GlobalPartitionStateEnum.AVAILABLE); + + // fromReset == true, assert force == false. + Assignments assignmentsPending = Assignments.of(Set.of( + Assignment.forPeer(node(0).name()), + Assignment.forPeer(node(2).name()), + Assignment.forPeer(node(3).name()) + ), timestamp, true); + + assertPendingAssignments(node0, partId, assignmentsPending); + + // Any of 3 can be chosen the node for the forced pending, so block them all. + Assignments blockedRebalance0 = Assignments.of(timestamp, + Assignment.forPeer(node(0).name()) + ); + blockRebalanceStableSwitch(partId, blockedRebalance0); + + Assignments blockedRebalance2 = Assignments.of(timestamp, + Assignment.forPeer(node(2).name()) + ); + blockRebalanceStableSwitch(partId, blockedRebalance2); + + Assignments blockedRebalance3 = Assignments.of(timestamp, + Assignment.forPeer(node(3).name()) + ); + blockRebalanceStableSwitch(partId, blockedRebalance3); + + CompletableFuture resetFuture2 = node0.disasterRecoveryManager().resetAllPartitions(zoneName, QUALIFIED_TABLE_NAME, true, -1); + assertThat(resetFuture2, willCompleteSuccessfully()); + + Assignments pendingAssignments = getPendingAssignments(node0, partId); + + assertTrue(pendingAssignments.force()); + assertFalse(pendingAssignments.fromReset()); + } + + @Test + @ZoneParams(nodes = 5, replicas = 3, partitions = 1) + public void testPlannedIsOverwritten() throws Exception { + // Disable scale down to avoid unwanted rebalance. + executeSql(format("ALTER ZONE %s SET data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE)); + int partId = 0; + + IgniteImpl node0 = igniteImpl(0); + + int catalogVersion = node0.catalogManager().latestCatalogVersion(); + long timestamp = node0.catalogManager().catalog(catalogVersion).time(); + + awaitPrimaryReplica(node0, partId); + + assertRealAssignments(node0, partId, 0, 1, 2); + + stopNodesInParallel(1, 2); + + Assignments assignments = Assignments.of(timestamp, + Assignment.forPeer(node(0).name()) + ); + + blockRebalanceStableSwitch(partId, assignments); + + // Reset produces + // pending = [0, force] + // planned = [0, 3, 4] + CompletableFuture resetFuture = node0.disasterRecoveryManager().resetAllPartitions(zoneName, QUALIFIED_TABLE_NAME, true, -1); + assertThat(resetFuture, willCompleteSuccessfully()); + + Assignments assignmentsPending = Assignments.forced(Set.of( + Assignment.forPeer(node(0).name()) + ), timestamp); + + assertPendingAssignments(node0, partId, assignmentsPending); + + Assignments assignmentsPlanned = Assignments.of(Set.of( + Assignment.forPeer(node(0).name()), + Assignment.forPeer(node(3).name()), + Assignment.forPeer(node(4).name()) + ), timestamp, true); + + assertPlannedAssignments(node0, partId, assignmentsPlanned); + + executeSql(format("ALTER ZONE %s SET data_nodes_auto_adjust_scale_down=%d", zoneName, 2)); + + Assignments assignmentsPlannedReplaced = Assignments.of(Set.of( + Assignment.forPeer(node(0).name()), + Assignment.forPeer(node(3).name()), + Assignment.forPeer(node(4).name()) + ), timestamp); + + assertPlannedAssignments(node0, partId, assignmentsPlannedReplaced, 10_000); + } + /** * Tests that in a situation from the test {@link #testInsertFailsIfMajorityIsLost()} it is possible to recover partition using a * disaster recovery API, but with manual flag set to false. We expect that in this replica factor won't be restored. @@ -614,6 +761,8 @@ void testAutomaticRebalanceIfMajorityIsLost() throws Exception { IgniteImpl node0 = igniteImpl(0); Table table = node0.tables().table(TABLE_NAME); + int catalogVersion = node0.catalogManager().latestCatalogVersion(); + long timestamp = node0.catalogManager().catalog(catalogVersion).time(); awaitPrimaryReplica(node0, partId); @@ -643,6 +792,13 @@ void testAutomaticRebalanceIfMajorityIsLost() throws Exception { assertNull(getPendingAssignments(node0, partId)); assertRealAssignments(node0, partId, 1); + + // No fromReset flag is set on stable. + Assignments assignmentsStable = Assignments.of(Set.of( + Assignment.forPeer(node(1).name()) + ), timestamp); + + assertStableAssignments(node0, partId, assignmentsStable); } /** @@ -740,14 +896,16 @@ public void testIncompleteRebalanceBeforeAutomaticResetPartitions() throws Excep node0.disasterRecoveryManager().resetAllPartitions(zoneName, QUALIFIED_TABLE_NAME, false, 1); assertThat(resetFuture, willCompleteSuccessfully()); + // force == true, fromReset == false. Assignments assignmentForced1 = Assignments.forced(Set.of(Assignment.forPeer(node(1).name())), timestamp); assertPendingAssignments(node0, partId, assignmentForced1); + // fromReset == true, force == false. Assignments assignments13 = Assignments.of(Set.of( Assignment.forPeer(node(1).name()), Assignment.forPeer(node(3).name()) - ), timestamp); + ), timestamp, true); assertPlannedAssignments(node0, partId, assignments13); } @@ -874,7 +1032,7 @@ void testThoPhaseResetMaxLogIndex() throws Exception { .map(Assignment::forPeer) .collect(Collectors.toSet()); - Assignments assignmentsPlanned = Assignments.of(peers, timestamp); + Assignments assignmentsPlanned = Assignments.of(peers, timestamp, true); assertPlannedAssignments(node0, partId, assignmentsPlanned); @@ -1009,7 +1167,7 @@ void testThoPhaseResetEqualLogIndex() throws Exception { .map(Assignment::forPeer) .collect(Collectors.toSet()); - Assignments assignmentsPlanned = Assignments.of(peers, timestamp); + Assignments assignmentsPlanned = Assignments.of(peers, timestamp, true); assertPlannedAssignments(node0, partId, assignmentsPlanned); @@ -1071,7 +1229,7 @@ void testTwoPhaseResetOnEmptyNodes() throws Exception { Assignment.forPeer(node(0).name()), Assignment.forPeer(node(1).name()), Assignment.forPeer(node(4).name()) - ), timestamp); + ), timestamp, true); assertPlannedAssignments(node0, partId, assignments13); } @@ -1234,8 +1392,12 @@ private void assertPendingAssignments(IgniteImpl node0, int partId, Assignments } private void assertPlannedAssignments(IgniteImpl node0, int partId, Assignments expected) throws InterruptedException { + assertPlannedAssignments(node0, partId, expected, 2000); + } + + private void assertPlannedAssignments(IgniteImpl node0, int partId, Assignments expected, long timeout) throws InterruptedException { assertTrue( - waitForCondition(() -> expected.equals(getPlannedAssignments(node0, partId)), 2000), + waitForCondition(() -> expected.equals(getPlannedAssignments(node0, partId)), timeout), () -> "Expected: " + expected + ", actual: " + getPlannedAssignments(node0, partId) ); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java index 1db49fb3c03..085de7dba4a 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java @@ -78,6 +78,13 @@ import org.apache.ignite.internal.util.CollectionUtils; import org.jetbrains.annotations.Nullable; +/** + * Executes partition reset request to restore partition assignments when majority is not available. + * + *

The reset is executed in two stages - first we switch to a single node having the most up-to-date data, + * then we switch to other available nodes up to the configured replica factor, in the case of manual reset, and to the available nodes from + * the original group, in the case of the automatic reset. + */ class GroupUpdateRequest implements DisasterRecoveryRequest { private static final IgniteLogger LOG = Loggers.forClass(GroupUpdateRequest.class); @@ -333,7 +340,7 @@ private static CompletableFuture partitionUpdate( // If planned nodes set consists of reset node assignment only then we shouldn't schedule the same planned rebalance. isProposedPendingEqualsProposedPlanned ? null - : Assignments.toBytes(partAssignments, assignmentsTimestamp) + : Assignments.toBytes(partAssignments, assignmentsTimestamp, true) ); return metaStorageMgr.invoke(invokeClosure).thenApply(sr -> { @@ -366,8 +373,8 @@ private static CompletableFuture partitionUpdate( } /** - * Returns an assignment with the most up to date log index, if there are more than one node with the same index, - * returns the first one in the lexicographic order. + * Returns an assignment with the most up to date log index, if there are more than one node with the same index, returns the first one + * in the lexicographic order. */ private static Assignment nextAssignment( LocalPartitionStateMessageByNode localPartitionStateByNode,