Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23885 Start partition in a common manner in case of a single element in chain #5012

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.ignite.internal.metastorage.dsl.Iif;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.partitiondistribution.AssignmentsChain;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -620,6 +621,24 @@ public static CompletableFuture<Set<Assignment>> partitionAssignments(
.thenApply(e -> (e.value() == null) ? null : Assignments.fromBytes(e.value()).nodes());
}

/**
* Returns partition assignments from meta storage locally.
*
* @param metaStorageManager Meta storage manager.
* @param tablePartitionId Table partition id.
* @param revision Revision.
* @return Returns partition assignments from meta storage locally or {@code null} if assignments is absent.
*/
public static @Nullable Assignments stableAssignments(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's name it with "Locally" suffix, or rename tableAssignmentsChainGetLocally. Let's have common naming pattern for "get locally" methods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to *GetLocally

MetaStorageManager metaStorageManager,
TablePartitionId tablePartitionId,
long revision
) {
Entry entry = metaStorageManager.getLocally(stablePartAssignmentsKey(tablePartitionId), revision);

return (entry == null || entry.empty() || entry.tombstone()) ? null : Assignments.fromBytes(entry.value());
}

/**
* Returns partition assignments from meta storage locally.
*
Expand All @@ -636,9 +655,9 @@ public static Set<Assignment> partitionAssignmentsGetLocally(
int partitionNumber,
long revision
) {
Entry entry = metaStorageManager.getLocally(stablePartAssignmentsKey(new TablePartitionId(tableId, partitionNumber)), revision);
Assignments assignments = stableAssignments(metaStorageManager, new TablePartitionId(tableId, partitionNumber), revision);

return (entry == null || entry.empty() || entry.tombstone()) ? null : Assignments.fromBytes(entry.value()).nodes();
return assignments == null ? null : assignments.nodes();
}

/**
Expand Down Expand Up @@ -686,11 +705,11 @@ public static List<Assignments> tableAssignmentsGetLocally(
) {
return IntStream.range(0, numberOfPartitions)
.mapToObj(p -> {
Entry e = metaStorageManager.getLocally(stablePartAssignmentsKey(new TablePartitionId(tableId, p)), revision);
Assignments assignments = stableAssignments(metaStorageManager, new TablePartitionId(tableId, p), revision);

assert e != null && !e.empty() && !e.tombstone() : e;
assert assignments != null;

return Assignments.fromBytes(e.value());
return assignments;
})
.collect(toList());
}
Expand Down Expand Up @@ -718,4 +737,42 @@ public static List<Assignments> tablePendingAssignmentsGetLocally(
})
.collect(toList());
}

/**
* Returns assignments chains for all table partitions from meta storage locally.
*
* @param metaStorageManager Meta storage manager.
* @param tableId Table id.
* @param numberOfPartitions Number of partitions.
* @param revision Revision.
* @return Future with table assignments as a value.
*/
public static List<AssignmentsChain> tableAssignmentsChainGetLocally(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or let's remove locally suffix from here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to *GetLocally

MetaStorageManager metaStorageManager,
int tableId,
int numberOfPartitions,
long revision
) {
return IntStream.range(0, numberOfPartitions)
.mapToObj(p -> assignmentsChain(metaStorageManager, new TablePartitionId(tableId, p), revision))
.collect(toList());
}

/**
* Returns assignments chain from meta storage locally.
*
* @param metaStorageManager Meta storage manager.
* @param tablePartitionId Table partition id.
* @param revision Revision.
* @return Returns assignments chain from meta storage locally or {@code null} if assignments is absent.
*/
public static @Nullable AssignmentsChain assignmentsChain(
MetaStorageManager metaStorageManager,
TablePartitionId tablePartitionId,
long revision
) {
Entry e = metaStorageManager.getLocally(assignmentsChainKey(tablePartitionId), revision);

return e != null ? AssignmentsChain.fromBytes(e.value()) : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.PENDING_ASSIGNMENTS_PREFIX_BYTES;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.assignmentsChain;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.assignmentsChainKey;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractTablePartitionId;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignmentsGetLocally;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stableAssignments;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.subtract;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableAssignmentsChainGetLocally;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableAssignmentsGetLocally;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tablePendingAssignmentsGetLocally;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.union;
Expand Down Expand Up @@ -304,7 +307,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
* Versioned value for tracking RAFT groups initialization and starting completion.
*
* <p>Only explicitly updated in
* {@link #startLocalPartitionsAndClients(CompletableFuture, List, TableImpl, boolean, long)}.
* {@link #startLocalPartitionsAndClients(CompletableFuture, List, List, TableImpl, boolean, long)}.
*
* <p>Completed strictly after {@link #localPartitionsVv}.
*/
Expand Down Expand Up @@ -1132,6 +1135,7 @@ private CompletableFuture<?> onTableRename(RenameTableEventParameters parameters
private CompletableFuture<Void> startLocalPartitionsAndClients(
CompletableFuture<List<Assignments>> stableAssignmentsFuture,
List<@Nullable Assignments> pendingAssignmentsForPartitions,
List<@Nullable AssignmentsChain> assignmentsChains,
TableImpl table,
boolean isRecovery,
long assignmentsTimestamp
Expand Down Expand Up @@ -1161,13 +1165,21 @@ private CompletableFuture<Void> startLocalPartitionsAndClients(
boolean shouldStartPartition;

if (isRecovery) {
// The condition to start the replica is
// `pending.contains(node) || (stable.contains(node) && !pending.isForce())`.
// However we check only the right part of this condition here
// since after `startTables` we have a call to `processAssignmentsOnRecovery`,
// which executes pending assignments update and will start required partitions there.
shouldStartPartition = localMemberAssignmentInStable != null
&& (pendingAssignments == null || !pendingAssignments.force());
AssignmentsChain assignmentsChain = assignmentsChains.get(i);

if (assignmentsChain == null || assignmentsChain.chain().size() == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add comment describing why assignmentsChain.chain().size() == 1 is a special case and why we have different behaviour instead

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would refactor this check to a separate method so we could reuse this method in the second place

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// The condition to start the replica is
// `pending.contains(node) || (stable.contains(node) && !pending.isForce())`.
// However we check only the right part of this condition here
// since after `startTables` we have a call to `processAssignmentsOnRecovery`,
// which executes pending assignments update and will start required partitions there.
shouldStartPartition = localMemberAssignmentInStable != null
&& (pendingAssignments == null || !pendingAssignments.force());
} else {
// TODO: Use logic from https://issues.apache.org/jira/browse/IGNITE-23874
shouldStartPartition = localMemberAssignmentInStable != null
&& (pendingAssignments == null || !pendingAssignments.force());
}
} else {
shouldStartPartition = localMemberAssignmentInStable != null;
}
Expand Down Expand Up @@ -1663,6 +1675,9 @@ private CompletableFuture<?> createTableLocally(
List<Assignments> pendingAssignments =
tablePendingAssignmentsGetLocally(metaStorageMgr, tableId, zoneDescriptor.partitions(), causalityToken);

List<AssignmentsChain> assignmentsChains =
tableAssignmentsChainGetLocally(metaStorageMgr, tableId, zoneDescriptor.partitions(), causalityToken);

CompletableFuture<List<Assignments>> stableAssignmentsFutureAfterInvoke =
writeTableAssignmentsToMetastore(tableId, zoneDescriptor.consistencyMode(), stableAssignmentsFuture);

Expand All @@ -1674,6 +1689,7 @@ private CompletableFuture<?> createTableLocally(
zoneDescriptor,
stableAssignmentsFutureAfterInvoke,
pendingAssignments,
assignmentsChains,
onNodeRecovery,
catalog.time()
);
Expand All @@ -1696,6 +1712,7 @@ private CompletableFuture<Void> createTableLocally(
CatalogZoneDescriptor zoneDescriptor,
CompletableFuture<List<Assignments>> stableAssignmentsFuture,
List<Assignments> pendingAssignments,
List<AssignmentsChain> assignmentsChains,
boolean onNodeRecovery,
long assignmentsTimestamp
) {
Expand Down Expand Up @@ -1748,6 +1765,7 @@ private CompletableFuture<Void> createTableLocally(
return startLocalPartitionsAndClients(
stableAssignmentsFuture,
pendingAssignments,
assignmentsChains,
table,
onNodeRecovery,
assignmentsTimestamp
Expand Down Expand Up @@ -2152,7 +2170,9 @@ private CompletableFuture<Void> handleChangePendingAssignmentEvent(
TablePartitionId replicaGrpId = extractTablePartitionId(pendingAssignmentsEntry.key(), PENDING_ASSIGNMENTS_PREFIX_BYTES);

// Stable assignments from the meta store, which revision is bounded by the current pending event.
Assignments stableAssignments = stableAssignments(replicaGrpId, revision);
Assignments stableAssignments = stableAssignments(metaStorageMgr, replicaGrpId, revision);

AssignmentsChain assignmentsChain = assignmentsChain(metaStorageMgr, replicaGrpId, revision);

Assignments pendingAssignments = Assignments.fromBytes(pendingAssignmentsEntry.value());

Expand Down Expand Up @@ -2195,6 +2215,7 @@ private CompletableFuture<Void> handleChangePendingAssignmentEvent(
table,
stableAssignments,
pendingAssignments,
assignmentsChain,
revision,
isRecovery
).thenAccept(v -> executeIfLocalNodeIsPrimaryForGroup(
Expand All @@ -2218,6 +2239,7 @@ private CompletableFuture<Void> handleChangePendingAssignmentEvent(
TableImpl tbl,
@Nullable Assignments stableAssignments,
Assignments pendingAssignments,
@Nullable AssignmentsChain assignmentsChain,
long revision,
boolean isRecovery
) {
Expand All @@ -2235,7 +2257,12 @@ private CompletableFuture<Void> handleChangePendingAssignmentEvent(
// `pending.contains(node) || (stable.contains(node) && !pending.isForce())`.
// This condition covers the left part of the OR expression.
// The right part of it is covered in `startLocalPartitionsAndClients`.
shouldStartLocalGroupNode = localMemberAssignmentInPending != null;
if (assignmentsChain == null || assignmentsChain.chain().size() == 1) {
shouldStartLocalGroupNode = localMemberAssignmentInPending != null;
} else {
// TODO: Use logic from https://issues.apache.org/jira/browse/IGNITE-23874.
shouldStartLocalGroupNode = localMemberAssignmentInPending != null;
}
} else {
shouldStartLocalGroupNode = localMemberAssignmentInPending != null && localMemberAssignmentInStable == null;
}
Expand Down Expand Up @@ -2980,7 +3007,7 @@ public CompletableFuture<Void> restartPartition(TablePartitionId tablePartitionI
TableImpl table = tables.get(tablePartitionId.tableId());

return stopPartitionForRestart(tablePartitionId, table).thenComposeAsync(unused1 -> {
Assignments stableAssignments = stableAssignments(tablePartitionId, revision);
Assignments stableAssignments = stableAssignments(metaStorageMgr, tablePartitionId, revision);

assert stableAssignments != null : "tablePartitionId=" + tablePartitionId + ", revision=" + revision;

Expand All @@ -3005,12 +3032,6 @@ public CompletableFuture<Void> restartPartition(TablePartitionId tablePartitionI
}), ioExecutor));
}

private @Nullable Assignments stableAssignments(TablePartitionId tablePartitionId, long revision) {
Entry entry = metaStorageMgr.getLocally(stablePartAssignmentsKey(tablePartitionId), revision);

return Assignments.fromBytes(entry.value());
}

@Override
public void setStreamerReceiverRunner(StreamerReceiverRunner runner) {
this.streamerReceiverRunner = runner;
Expand Down