Skip to content

Commit

Permalink
IGNITE-23879 Indicate stable switch after the second phase of resetPa…
Browse files Browse the repository at this point in the history
…rtitions (#4916)
  • Loading branch information
Cyrill authored Dec 24, 2024
1 parent 9244825 commit 8ace698
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
public class Assignments {
/** Empty assignments. */
public static final Assignments EMPTY =
new Assignments(Collections.emptySet(), false, HybridTimestamp.NULL_HYBRID_TIMESTAMP);
new Assignments(Collections.emptySet(), false, HybridTimestamp.NULL_HYBRID_TIMESTAMP, false);

/** Set of nodes. */
@IgniteToStringInclude
Expand All @@ -54,15 +54,24 @@ public class Assignments {
/** Time when the catalog version that the assignments were calculated against becomes active (i. e. available for use). */
private final long timestamp;

/**
* This assignment was created in the second phase of reset.
* See GroupUpdateRequest javadoc for details.
*/
private final boolean fromReset;

/**
* Constructor.
*/
private Assignments(Collection<Assignment> nodes, boolean force, long timestamp) {
private Assignments(Collection<Assignment> nodes, boolean force, long timestamp, boolean fromReset) {
// A set of nodes must be a HashSet in order for serialization to produce stable results,
// that could be compared as byte arrays.
this.nodes = nodes instanceof HashSet ? ((HashSet<Assignment>) nodes) : new HashSet<>(nodes);
this.force = force;
this.timestamp = timestamp;
this.fromReset = fromReset;

assert !(force && fromReset) : "Only one flag can be set from 'force' and 'fromReset'.";
}

/**
Expand All @@ -71,7 +80,16 @@ private Assignments(Collection<Assignment> nodes, boolean force, long timestamp)
* @param nodes Set of nodes.
*/
public static Assignments of(Set<Assignment> nodes, long timestamp) {
return new Assignments(nodes, false, timestamp);
return new Assignments(nodes, false, timestamp, false);
}

/**
* Creates a new instance.
*
* @param nodes Set of nodes.
*/
public static Assignments of(Set<Assignment> nodes, long timestamp, boolean fromReset) {
return new Assignments(nodes, false, timestamp, fromReset);
}

/**
Expand All @@ -80,7 +98,7 @@ public static Assignments of(Set<Assignment> nodes, long timestamp) {
* @param nodes Array of nodes.
*/
public static Assignments of(long timestamp, Assignment... nodes) {
return new Assignments(Arrays.asList(nodes), false, timestamp);
return new Assignments(Arrays.asList(nodes), false, timestamp, false);
}

/**
Expand All @@ -90,7 +108,7 @@ public static Assignments of(long timestamp, Assignment... nodes) {
* @see #force()
*/
public static Assignments forced(Set<Assignment> nodes, long timestamp) {
return new Assignments(nodes, true, timestamp);
return new Assignments(nodes, true, timestamp, false);
}

/**
Expand All @@ -109,6 +127,13 @@ public boolean force() {
return force;
}

/**
* Returns {@code true} if this assignment was created in the second phase of reset.
*/
public boolean fromReset() {
return fromReset;
}

/**
* Returns a timestamp when the catalog version that the assignments were calculated against becomes active.
*/
Expand Down Expand Up @@ -145,7 +170,16 @@ public byte[] toBytes() {
* @see #toBytes()
*/
public static byte[] toBytes(Set<Assignment> assignments, long timestamp) {
return new Assignments(assignments, false, timestamp).toBytes();
return new Assignments(assignments, false, timestamp, false).toBytes();
}

/**
* Serializes assignments into an array of bytes.
*
* @see #toBytes()
*/
public static byte[] toBytes(Set<Assignment> assignments, long timestamp, boolean fromReset) {
return new Assignments(assignments, false, timestamp, fromReset).toBytes();
}

/**
Expand All @@ -172,13 +206,14 @@ public boolean equals(Object o) {
}

Assignments that = (Assignments) o;
return force == that.force && nodes.equals(that.nodes);
return force == that.force && nodes.equals(that.nodes) && fromReset == that.fromReset;
}

@Override
public int hashCode() {
int result = nodes.hashCode();
result = 31 * result + Boolean.hashCode(force);
result = 31 * result + Boolean.hashCode(fromReset);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ protected void writeExternalData(Assignments assignments, IgniteDataOutput out)

out.writeBoolean(assignments.force());
hybridTimestamp(assignments.timestamp()).writeTo(out);
out.writeBoolean(assignments.fromReset());
}

private static void writeAssignment(Assignment assignment, IgniteDataOutput out) throws IOException {
Expand All @@ -55,8 +56,9 @@ protected Assignments readExternalData(byte protoVer, IgniteDataInput in) throws
Set<Assignment> nodes = readNodes(in);
boolean force = in.readBoolean();
HybridTimestamp timestamp = HybridTimestamp.readFrom(in);
boolean fromReset = in.readBoolean();

return force ? Assignments.forced(nodes, timestamp.longValue()) : Assignments.of(nodes, timestamp.longValue());
return force ? Assignments.forced(nodes, timestamp.longValue()) : Assignments.of(nodes, timestamp.longValue(), fromReset);
}

private static Set<Assignment> readNodes(IgniteDataInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junitpioneer.jupiter.cartesian.CartesianTest;
import org.junitpioneer.jupiter.cartesian.CartesianTest.Values;

class AssignmentsSerializerTest {
private static final String NOT_FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1 = "Ae++QwMEYWJjAQRkZWYAAFHCjAEA9AY=";
private static final String FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1 = "Ae++QwMEYWJjAQRkZWYAAVHCjAEA9AY=";
private static final String NOT_FORCED_NOT_RESET_ASSIGNMENTS_SERIALIZED_WITH_V1 = "Ae++QwMEYWJjAQRkZWYAAFHCjAEA9AYA";
private static final String NOT_FORCED_RESET_ASSIGNMENTS_SERIALIZED_WITH_V1 = "Ae++QwMEYWJjAQRkZWYAAFHCjAEA9AYB";
private static final String FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1 = "Ae++QwMEYWJjAQRkZWYAAVHCjAEA9AYA";

private final AssignmentsSerializer serializer = new AssignmentsSerializer();

Expand All @@ -51,33 +52,48 @@ private static long baseTimestamp(int logical) {
return new HybridTimestamp(BASE_PHYSICAL_TIME, logical).longValue();
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void serializationAndDeserialization(boolean force) {
Assignments originalAssignments = testAssignments(force);
@CartesianTest
void serializationAndDeserialization(
@Values(booleans = {true, false}) boolean force,
@Values(booleans = {true, false}) boolean fromReset
) {
Assignments originalAssignments = testAssignments(force, fromReset);

byte[] bytes = VersionedSerialization.toBytes(originalAssignments, serializer);
Assignments restoredAssignments = VersionedSerialization.fromBytes(bytes, serializer);

assertThat(restoredAssignments, equalTo(originalAssignments));
}

private static Assignments testAssignments(boolean force) {
private static Assignments testAssignments(boolean force, boolean fromReset) {
Set<Assignment> nodes = Set.of(Assignment.forPeer("abc"), Assignment.forLearner("def"));

return force
? Assignments.forced(nodes, baseTimestamp(5))
: Assignments.of(nodes, baseTimestamp(5));
: Assignments.of(nodes, baseTimestamp(5), fromReset);
}

@Test
void v1NotForcedCanBeDeserialized() {
byte[] bytes = Base64.getDecoder().decode(NOT_FORCED_ASSIGNMENTS_SERIALIZED_WITH_V1);
void v1NotForcedNotResetCanBeDeserialized() {
byte[] bytes = Base64.getDecoder().decode(NOT_FORCED_NOT_RESET_ASSIGNMENTS_SERIALIZED_WITH_V1);
Assignments restoredAssignments = VersionedSerialization.fromBytes(bytes, serializer);

assertNodesFromV1(restoredAssignments);

assertThat(restoredAssignments.force(), is(false));
assertThat(restoredAssignments.fromReset(), is(false));
assertThat(restoredAssignments.timestamp(), is(baseTimestamp(5)));
}

@Test
void v1NotForcedResetCanBeDeserialized() {
byte[] bytes = Base64.getDecoder().decode(NOT_FORCED_RESET_ASSIGNMENTS_SERIALIZED_WITH_V1);
Assignments restoredAssignments = VersionedSerialization.fromBytes(bytes, serializer);

assertNodesFromV1(restoredAssignments);

assertThat(restoredAssignments.force(), is(false));
assertThat(restoredAssignments.fromReset(), is(true));
assertThat(restoredAssignments.timestamp(), is(baseTimestamp(5)));
}

Expand All @@ -93,8 +109,8 @@ void v1ForcedCanBeDeserialized() {
}

@SuppressWarnings("unused")
private String v1Base64(boolean force) {
Assignments originalAssignments = testAssignments(force);
private String v1Base64(boolean force, boolean fromReset) {
Assignments originalAssignments = testAssignments(force, fromReset);
byte[] v1Bytes = VersionedSerialization.toBytes(originalAssignments, serializer);
return Base64.getEncoder().encodeToString(v1Bytes);
}
Expand Down
Loading

0 comments on commit 8ace698

Please sign in to comment.