Skip to content

Commit

Permalink
IGNITE-23131 java.lang.AssertionError: The local node is outside of t…
Browse files Browse the repository at this point in the history
…he replication group (#4396)
  • Loading branch information
JAkutenshi authored Sep 23, 2024
1 parent 0876244 commit 6d47468
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,9 @@ private void waitForMetadataCompletenessAtNow() {
* Starts the created components.
*/
void start() {
List<IgniteComponent> firstComponents = List.of(
ComponentContext componentContext = new ComponentContext();

deployWatchesFut = startComponentsAsync(componentContext, List.of(
threadPoolsManager,
vaultManager,
nodeCfgMgr,
Expand All @@ -1303,54 +1305,44 @@ void start() {
cmgLogStorageFactory,
raftManager,
cmgManager
);

ComponentContext componentContext = new ComponentContext();

List<CompletableFuture<?>> componentFuts =
firstComponents.stream()
.map(component -> component.startAsync(componentContext))
.collect(Collectors.toList());

nodeComponents.addAll(firstComponents);

deployWatchesFut = CompletableFuture.supplyAsync(() -> {
List<IgniteComponent> secondComponents = List.of(
lowWatermark,
metaStorageManager,
clusterCfgMgr,
clockWaiter,
catalogManager,
indexMetaStorage,
distributionZoneManager,
replicaManager,
txManager,
dataStorageMgr,
schemaManager,
partitionReplicaLifecycleManager,
tableManager,
indexManager
);

componentFuts.addAll(secondComponents.stream()
.map(component -> component.startAsync(componentContext)).collect(Collectors.toList()));

nodeComponents.addAll(secondComponents);

var configurationNotificationFut = metaStorageManager.recoveryFinishedFuture().thenCompose(rev -> {
return allOf(
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
((MetaStorageManagerImpl) metaStorageManager).notifyRevisionUpdateListenerOnStart()
);
});
)).thenApplyAsync(v -> startComponentsAsync(componentContext, List.of(
lowWatermark,
metaStorageManager,
clusterCfgMgr,
clockWaiter,
catalogManager,
indexMetaStorage,
distributionZoneManager,
replicaManager,
txManager,
dataStorageMgr,
schemaManager,
partitionReplicaLifecycleManager,
tableManager,
indexManager
))).thenComposeAsync(componentFuts -> {
CompletableFuture<Void> configurationNotificationFut = metaStorageManager.recoveryFinishedFuture()
.thenCompose(rev -> allOf(
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
((MetaStorageManagerImpl) metaStorageManager).notifyRevisionUpdateListenerOnStart(),
componentFuts
));

assertThat(configurationNotificationFut, willSucceedIn(1, TimeUnit.MINUTES));

lowWatermark.scheduleUpdates();

return metaStorageManager.deployWatches();
}).thenCombine(allOf(componentFuts.toArray(CompletableFuture[]::new)), (deployWatchesFut, unused) -> null);
});
}

private CompletableFuture<Void> startComponentsAsync(ComponentContext componentContext, List<IgniteComponent> components) {
nodeComponents.addAll(components);

return allOf(components.stream()
.map(component -> component.startAsync(componentContext))
.toArray(CompletableFuture[]::new));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,7 @@ public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
.thenCompose(ignored -> processAssignmentsOnRecovery(recoveryRevision));

metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX), pendingAssignmentsRebalanceListener);

metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX), stableAssignmentsRebalanceListener);

metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX), assignmentsSwitchRebalanceListener);

catalogMgr.listen(ZONE_CREATE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.client.handler.configuration.ClientConnectorExtensionConfigurationSchema;
import org.apache.ignite.internal.app.ThreadPoolsManager;
Expand Down Expand Up @@ -1493,7 +1492,9 @@ private void waitForMetadataCompletenessAtNow() {
* Starts the created components.
*/
void start() {
List<IgniteComponent> firstComponents = List.of(
ComponentContext componentContext = new ComponentContext();

deployWatchesFut = startComponentsAsync(componentContext, List.of(
threadPoolsManager,
vaultManager,
nodeCfgMgr,
Expand All @@ -1504,53 +1505,43 @@ void start() {
msLogStorageFactory,
raftManager,
cmgManager
);

ComponentContext componentContext = new ComponentContext();
List<CompletableFuture<?>> componentFuts =
firstComponents.stream()
.map(component -> component.startAsync(componentContext))
.collect(Collectors.toList());

nodeComponents.addAll(firstComponents);

deployWatchesFut = CompletableFuture.supplyAsync(() -> {
List<IgniteComponent> secondComponents = List.of(
lowWatermark,
metaStorageManager,
clusterCfgMgr,
clockWaiter,
catalogManager,
indexMetaStorage,
distributionZoneManager,
replicaManager,
txManager,
dataStorageMgr,
schemaManager,
tableManager,
indexManager
);

componentFuts.addAll(secondComponents.stream()
.map(component -> component.startAsync(componentContext))
.collect(Collectors.toList()));

nodeComponents.addAll(secondComponents);

var configurationNotificationFut = metaStorageManager.recoveryFinishedFuture().thenCompose(rev -> {
return allOf(
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
((MetaStorageManagerImpl) metaStorageManager).notifyRevisionUpdateListenerOnStart()
);
});
)).thenApplyAsync(v -> startComponentsAsync(componentContext, List.of(
lowWatermark,
metaStorageManager,
clusterCfgMgr,
clockWaiter,
catalogManager,
indexMetaStorage,
distributionZoneManager,
replicaManager,
txManager,
dataStorageMgr,
schemaManager,
tableManager,
indexManager
))).thenComposeAsync(componentFuts -> {
CompletableFuture<Void> configurationNotificationFut = metaStorageManager.recoveryFinishedFuture()
.thenCompose(rev -> allOf(
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
((MetaStorageManagerImpl) metaStorageManager).notifyRevisionUpdateListenerOnStart(),
componentFuts
));

assertThat(configurationNotificationFut, willSucceedIn(1, TimeUnit.MINUTES));

lowWatermark.scheduleUpdates();

return metaStorageManager.deployWatches();
}).thenCombine(allOf(componentFuts.toArray(CompletableFuture[]::new)), (deployWatchesFut, unused) -> null);
});
}

private CompletableFuture<Void> startComponentsAsync(ComponentContext componentContext, List<IgniteComponent> components) {
nodeComponents.addAll(components);

return allOf(components.stream()
.map(component -> component.startAsync(componentContext))
.toArray(CompletableFuture[]::new));
}

/**
Expand Down

0 comments on commit 6d47468

Please sign in to comment.