From 6d47468ce1ec1ee584d23b4e6427edcef73526b8 Mon Sep 17 00:00:00 2001 From: Mikhail Efremov Date: Mon, 23 Sep 2024 20:41:09 +0600 Subject: [PATCH] IGNITE-23131 java.lang.AssertionError: The local node is outside of the replication group (#4396) --- .../replicator/ItReplicaLifecycleTest.java | 78 +++++++++---------- .../PartitionReplicaLifecycleManager.java | 2 - .../rebalance/ItRebalanceDistributedTest.java | 77 ++++++++---------- 3 files changed, 69 insertions(+), 88 deletions(-) diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java index 6cdba72af1e..5b9cb27114b 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java @@ -1292,7 +1292,9 @@ private void waitForMetadataCompletenessAtNow() { * Starts the created components. */ void start() { - List firstComponents = List.of( + ComponentContext componentContext = new ComponentContext(); + + deployWatchesFut = startComponentsAsync(componentContext, List.of( threadPoolsManager, vaultManager, nodeCfgMgr, @@ -1303,54 +1305,44 @@ void start() { cmgLogStorageFactory, raftManager, cmgManager - ); - - ComponentContext componentContext = new ComponentContext(); - - List> componentFuts = - firstComponents.stream() - .map(component -> component.startAsync(componentContext)) - .collect(Collectors.toList()); - - nodeComponents.addAll(firstComponents); - - deployWatchesFut = CompletableFuture.supplyAsync(() -> { - List secondComponents = List.of( - lowWatermark, - metaStorageManager, - clusterCfgMgr, - clockWaiter, - catalogManager, - indexMetaStorage, - distributionZoneManager, - replicaManager, - txManager, - dataStorageMgr, - schemaManager, - partitionReplicaLifecycleManager, - tableManager, - indexManager - ); - - componentFuts.addAll(secondComponents.stream() - .map(component -> component.startAsync(componentContext)).collect(Collectors.toList())); - - nodeComponents.addAll(secondComponents); - - var configurationNotificationFut = metaStorageManager.recoveryFinishedFuture().thenCompose(rev -> { - return allOf( - nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(), - clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(), - ((MetaStorageManagerImpl) metaStorageManager).notifyRevisionUpdateListenerOnStart() - ); - }); + )).thenApplyAsync(v -> startComponentsAsync(componentContext, List.of( + lowWatermark, + metaStorageManager, + clusterCfgMgr, + clockWaiter, + catalogManager, + indexMetaStorage, + distributionZoneManager, + replicaManager, + txManager, + dataStorageMgr, + schemaManager, + partitionReplicaLifecycleManager, + tableManager, + indexManager + ))).thenComposeAsync(componentFuts -> { + CompletableFuture configurationNotificationFut = metaStorageManager.recoveryFinishedFuture() + .thenCompose(rev -> allOf( + nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(), + clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(), + ((MetaStorageManagerImpl) metaStorageManager).notifyRevisionUpdateListenerOnStart(), + componentFuts + )); assertThat(configurationNotificationFut, willSucceedIn(1, TimeUnit.MINUTES)); lowWatermark.scheduleUpdates(); return metaStorageManager.deployWatches(); - }).thenCombine(allOf(componentFuts.toArray(CompletableFuture[]::new)), (deployWatchesFut, unused) -> null); + }); + } + + private CompletableFuture startComponentsAsync(ComponentContext componentContext, List components) { + nodeComponents.addAll(components); + + return allOf(components.stream() + .map(component -> component.startAsync(componentContext)) + .toArray(CompletableFuture[]::new)); } /** diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java index 58de200b969..9457884705f 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java @@ -268,9 +268,7 @@ public CompletableFuture startAsync(ComponentContext componentContext) { .thenCompose(ignored -> processAssignmentsOnRecovery(recoveryRevision)); metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX), pendingAssignmentsRebalanceListener); - metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX), stableAssignmentsRebalanceListener); - metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX), assignmentsSwitchRebalanceListener); catalogMgr.listen(ZONE_CREATE, diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 06b48b80856..9eccf3b699c 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -92,7 +92,6 @@ import java.util.function.LongFunction; import java.util.function.LongSupplier; import java.util.function.Predicate; -import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.ignite.client.handler.configuration.ClientConnectorExtensionConfigurationSchema; import org.apache.ignite.internal.app.ThreadPoolsManager; @@ -1493,7 +1492,9 @@ private void waitForMetadataCompletenessAtNow() { * Starts the created components. */ void start() { - List firstComponents = List.of( + ComponentContext componentContext = new ComponentContext(); + + deployWatchesFut = startComponentsAsync(componentContext, List.of( threadPoolsManager, vaultManager, nodeCfgMgr, @@ -1504,53 +1505,43 @@ void start() { msLogStorageFactory, raftManager, cmgManager - ); - - ComponentContext componentContext = new ComponentContext(); - List> componentFuts = - firstComponents.stream() - .map(component -> component.startAsync(componentContext)) - .collect(Collectors.toList()); - - nodeComponents.addAll(firstComponents); - - deployWatchesFut = CompletableFuture.supplyAsync(() -> { - List secondComponents = List.of( - lowWatermark, - metaStorageManager, - clusterCfgMgr, - clockWaiter, - catalogManager, - indexMetaStorage, - distributionZoneManager, - replicaManager, - txManager, - dataStorageMgr, - schemaManager, - tableManager, - indexManager - ); - - componentFuts.addAll(secondComponents.stream() - .map(component -> component.startAsync(componentContext)) - .collect(Collectors.toList())); - - nodeComponents.addAll(secondComponents); - - var configurationNotificationFut = metaStorageManager.recoveryFinishedFuture().thenCompose(rev -> { - return allOf( - nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(), - clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(), - ((MetaStorageManagerImpl) metaStorageManager).notifyRevisionUpdateListenerOnStart() - ); - }); + )).thenApplyAsync(v -> startComponentsAsync(componentContext, List.of( + lowWatermark, + metaStorageManager, + clusterCfgMgr, + clockWaiter, + catalogManager, + indexMetaStorage, + distributionZoneManager, + replicaManager, + txManager, + dataStorageMgr, + schemaManager, + tableManager, + indexManager + ))).thenComposeAsync(componentFuts -> { + CompletableFuture configurationNotificationFut = metaStorageManager.recoveryFinishedFuture() + .thenCompose(rev -> allOf( + nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(), + clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(), + ((MetaStorageManagerImpl) metaStorageManager).notifyRevisionUpdateListenerOnStart(), + componentFuts + )); assertThat(configurationNotificationFut, willSucceedIn(1, TimeUnit.MINUTES)); lowWatermark.scheduleUpdates(); return metaStorageManager.deployWatches(); - }).thenCombine(allOf(componentFuts.toArray(CompletableFuture[]::new)), (deployWatchesFut, unused) -> null); + }); + } + + private CompletableFuture startComponentsAsync(ComponentContext componentContext, List components) { + nodeComponents.addAll(components); + + return allOf(components.stream() + .map(component -> component.startAsync(componentContext)) + .toArray(CompletableFuture[]::new)); } /**