diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java index c845b177636..1f1ec672ac9 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java @@ -63,6 +63,7 @@ import org.apache.ignite.internal.metastorage.dsl.Iif; import org.apache.ignite.internal.partitiondistribution.Assignment; import org.apache.ignite.internal.partitiondistribution.Assignments; +import org.apache.ignite.internal.partitiondistribution.AssignmentsChain; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.util.ExceptionUtils; import org.jetbrains.annotations.Nullable; @@ -620,6 +621,24 @@ public static CompletableFuture> partitionAssignments( .thenApply(e -> (e.value() == null) ? null : Assignments.fromBytes(e.value()).nodes()); } + /** + * Returns partition assignments from meta storage locally. + * + * @param metaStorageManager Meta storage manager. + * @param tablePartitionId Table partition id. + * @param revision Revision. + * @return Returns partition assignments from meta storage locally or {@code null} if assignments is absent. + */ + public static @Nullable Assignments stableAssignmentsGetLocally( + MetaStorageManager metaStorageManager, + TablePartitionId tablePartitionId, + long revision + ) { + Entry entry = metaStorageManager.getLocally(stablePartAssignmentsKey(tablePartitionId), revision); + + return (entry == null || entry.empty() || entry.tombstone()) ? null : Assignments.fromBytes(entry.value()); + } + /** * Returns partition assignments from meta storage locally. * @@ -636,9 +655,9 @@ public static Set partitionAssignmentsGetLocally( int partitionNumber, long revision ) { - Entry entry = metaStorageManager.getLocally(stablePartAssignmentsKey(new TablePartitionId(tableId, partitionNumber)), revision); + Assignments assignments = stableAssignmentsGetLocally(metaStorageManager, new TablePartitionId(tableId, partitionNumber), revision); - return (entry == null || entry.empty() || entry.tombstone()) ? null : Assignments.fromBytes(entry.value()).nodes(); + return assignments == null ? null : assignments.nodes(); } /** @@ -686,11 +705,11 @@ public static List tableAssignmentsGetLocally( ) { return IntStream.range(0, numberOfPartitions) .mapToObj(p -> { - Entry e = metaStorageManager.getLocally(stablePartAssignmentsKey(new TablePartitionId(tableId, p)), revision); + Assignments assignments = stableAssignmentsGetLocally(metaStorageManager, new TablePartitionId(tableId, p), revision); - assert e != null && !e.empty() && !e.tombstone() : e; + assert assignments != null; - return Assignments.fromBytes(e.value()); + return assignments; }) .collect(toList()); } @@ -718,4 +737,42 @@ public static List tablePendingAssignmentsGetLocally( }) .collect(toList()); } + + /** + * Returns assignments chains for all table partitions from meta storage locally. + * + * @param metaStorageManager Meta storage manager. + * @param tableId Table id. + * @param numberOfPartitions Number of partitions. + * @param revision Revision. + * @return Future with table assignments as a value. + */ + public static List tableAssignmentsChainGetLocally( + MetaStorageManager metaStorageManager, + int tableId, + int numberOfPartitions, + long revision + ) { + return IntStream.range(0, numberOfPartitions) + .mapToObj(p -> assignmentsChainGetLocally(metaStorageManager, new TablePartitionId(tableId, p), revision)) + .collect(toList()); + } + + /** + * Returns assignments chain from meta storage locally. + * + * @param metaStorageManager Meta storage manager. + * @param tablePartitionId Table partition id. + * @param revision Revision. + * @return Returns assignments chain from meta storage locally or {@code null} if assignments is absent. + */ + public static @Nullable AssignmentsChain assignmentsChainGetLocally( + MetaStorageManager metaStorageManager, + TablePartitionId tablePartitionId, + long revision + ) { + Entry e = metaStorageManager.getLocally(assignmentsChainKey(tablePartitionId), revision); + + return e != null ? AssignmentsChain.fromBytes(e.value()) : null; + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index e2a9f94fdbf..0df3f7db68d 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -33,12 +33,15 @@ import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.PENDING_ASSIGNMENTS_PREFIX_BYTES; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES; +import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.assignmentsChainGetLocally; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.assignmentsChainKey; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractTablePartitionId; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignmentsGetLocally; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey; +import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stableAssignmentsGetLocally; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.subtract; +import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableAssignmentsChainGetLocally; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableAssignmentsGetLocally; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tablePendingAssignmentsGetLocally; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.union; @@ -304,7 +307,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { * Versioned value for tracking RAFT groups initialization and starting completion. * *

Only explicitly updated in - * {@link #startLocalPartitionsAndClients(CompletableFuture, List, TableImpl, boolean, long)}. + * {@link #startLocalPartitionsAndClients(CompletableFuture, List, List, TableImpl, boolean, long)}. * *

Completed strictly after {@link #localPartitionsVv}. */ @@ -1132,6 +1135,7 @@ private CompletableFuture onTableRename(RenameTableEventParameters parameters private CompletableFuture startLocalPartitionsAndClients( CompletableFuture> stableAssignmentsFuture, List<@Nullable Assignments> pendingAssignmentsForPartitions, + List<@Nullable AssignmentsChain> assignmentsChains, TableImpl table, boolean isRecovery, long assignmentsTimestamp @@ -1161,13 +1165,23 @@ private CompletableFuture startLocalPartitionsAndClients( boolean shouldStartPartition; if (isRecovery) { - // The condition to start the replica is - // `pending.contains(node) || (stable.contains(node) && !pending.isForce())`. - // However we check only the right part of this condition here - // since after `startTables` we have a call to `processAssignmentsOnRecovery`, - // which executes pending assignments update and will start required partitions there. - shouldStartPartition = localMemberAssignmentInStable != null - && (pendingAssignments == null || !pendingAssignments.force()); + AssignmentsChain assignmentsChain = assignmentsChains.get(i); + + if (lastRebalanceWasGraceful(assignmentsChain)) { + // The condition to start the replica is + // `pending.contains(node) || (stable.contains(node) && !pending.isForce())`. + // However we check only the right part of this condition here + // since after `startTables` we have a call to `processAssignmentsOnRecovery`, + // which executes pending assignments update and will start required partitions there. + shouldStartPartition = localMemberAssignmentInStable != null + && (pendingAssignments == null || !pendingAssignments.force()); + } else { + // TODO: Use logic from https://issues.apache.org/jira/browse/IGNITE-23874 + LOG.warn("Recovery after a forced rebalance for table is not supported yet [tableId={}, partitionId={}].", + tableId, partId); + shouldStartPartition = localMemberAssignmentInStable != null + && (pendingAssignments == null || !pendingAssignments.force()); + } } else { shouldStartPartition = localMemberAssignmentInStable != null; } @@ -1663,6 +1677,9 @@ private CompletableFuture createTableLocally( List pendingAssignments = tablePendingAssignmentsGetLocally(metaStorageMgr, tableId, zoneDescriptor.partitions(), causalityToken); + List assignmentsChains = + tableAssignmentsChainGetLocally(metaStorageMgr, tableId, zoneDescriptor.partitions(), causalityToken); + CompletableFuture> stableAssignmentsFutureAfterInvoke = writeTableAssignmentsToMetastore(tableId, zoneDescriptor.consistencyMode(), stableAssignmentsFuture); @@ -1674,6 +1691,7 @@ private CompletableFuture createTableLocally( zoneDescriptor, stableAssignmentsFutureAfterInvoke, pendingAssignments, + assignmentsChains, onNodeRecovery, catalog.time() ); @@ -1696,6 +1714,7 @@ private CompletableFuture createTableLocally( CatalogZoneDescriptor zoneDescriptor, CompletableFuture> stableAssignmentsFuture, List pendingAssignments, + List assignmentsChains, boolean onNodeRecovery, long assignmentsTimestamp ) { @@ -1748,6 +1767,7 @@ private CompletableFuture createTableLocally( return startLocalPartitionsAndClients( stableAssignmentsFuture, pendingAssignments, + assignmentsChains, table, onNodeRecovery, assignmentsTimestamp @@ -2152,7 +2172,9 @@ private CompletableFuture handleChangePendingAssignmentEvent( TablePartitionId replicaGrpId = extractTablePartitionId(pendingAssignmentsEntry.key(), PENDING_ASSIGNMENTS_PREFIX_BYTES); // Stable assignments from the meta store, which revision is bounded by the current pending event. - Assignments stableAssignments = stableAssignments(replicaGrpId, revision); + Assignments stableAssignments = stableAssignmentsGetLocally(metaStorageMgr, replicaGrpId, revision); + + AssignmentsChain assignmentsChain = assignmentsChainGetLocally(metaStorageMgr, replicaGrpId, revision); Assignments pendingAssignments = Assignments.fromBytes(pendingAssignmentsEntry.value()); @@ -2195,6 +2217,7 @@ private CompletableFuture handleChangePendingAssignmentEvent( table, stableAssignments, pendingAssignments, + assignmentsChain, revision, isRecovery ).thenAccept(v -> executeIfLocalNodeIsPrimaryForGroup( @@ -2218,6 +2241,7 @@ private CompletableFuture handleChangePendingAssignmentEvent( TableImpl tbl, @Nullable Assignments stableAssignments, Assignments pendingAssignments, + @Nullable AssignmentsChain assignmentsChain, long revision, boolean isRecovery ) { @@ -2235,7 +2259,14 @@ private CompletableFuture handleChangePendingAssignmentEvent( // `pending.contains(node) || (stable.contains(node) && !pending.isForce())`. // This condition covers the left part of the OR expression. // The right part of it is covered in `startLocalPartitionsAndClients`. - shouldStartLocalGroupNode = localMemberAssignmentInPending != null; + if (lastRebalanceWasGraceful(assignmentsChain)) { + shouldStartLocalGroupNode = localMemberAssignmentInPending != null; + } else { + // TODO: Use logic from https://issues.apache.org/jira/browse/IGNITE-23874. + LOG.warn("Recovery after a forced rebalance for table is not supported yet [tablePartitionId={}].", + replicaGrpId); + shouldStartLocalGroupNode = localMemberAssignmentInPending != null; + } } else { shouldStartLocalGroupNode = localMemberAssignmentInPending != null && localMemberAssignmentInStable == null; } @@ -2353,6 +2384,15 @@ private CompletableFuture handleChangePendingAssignmentEvent( }), ioExecutor); } + /** + * For HA zones: Check that last rebalance was graceful (raft group maintained the majority) rather than forced (caused by a disaster + * recovery reset after losing the majority of nodes). + */ + private static boolean lastRebalanceWasGraceful(@Nullable AssignmentsChain assignmentsChain) { + // Assignments chain is either empty (when there have been no stable switch yet) or contains a single element in chain. + return assignmentsChain == null || assignmentsChain.chain().size() == 1; + } + private static PartitionSet extendPartitionSet(@Nullable PartitionSet oldPartitionSet, int partitionId) { PartitionSet newPartitionSet = Objects.requireNonNullElseGet(oldPartitionSet, BitSetPartitionSet::new); newPartitionSet.set(partitionId); @@ -2980,7 +3020,7 @@ public CompletableFuture restartPartition(TablePartitionId tablePartitionI TableImpl table = tables.get(tablePartitionId.tableId()); return stopPartitionForRestart(tablePartitionId, table).thenComposeAsync(unused1 -> { - Assignments stableAssignments = stableAssignments(tablePartitionId, revision); + Assignments stableAssignments = stableAssignmentsGetLocally(metaStorageMgr, tablePartitionId, revision); assert stableAssignments != null : "tablePartitionId=" + tablePartitionId + ", revision=" + revision; @@ -3005,12 +3045,6 @@ public CompletableFuture restartPartition(TablePartitionId tablePartitionI }), ioExecutor)); } - private @Nullable Assignments stableAssignments(TablePartitionId tablePartitionId, long revision) { - Entry entry = metaStorageMgr.getLocally(stablePartAssignmentsKey(tablePartitionId), revision); - - return Assignments.fromBytes(entry.value()); - } - @Override public void setStreamerReceiverRunner(StreamerReceiverRunner runner) { this.streamerReceiverRunner = runner;