diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java index 333ae059a90..02a0a1a84fa 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java @@ -63,8 +63,8 @@ protected int replicas() { @AfterEach public void after() throws Exception { try { - assertTrue(IgniteTestUtils.waitForCondition(() -> assertPartitionsSame(accounts, 0), TimeUnit.SECONDS.toMillis(5))); - assertTrue(IgniteTestUtils.waitForCondition(() -> assertPartitionsSame(customers, 0), TimeUnit.SECONDS.toMillis(5))); + assertTrue(IgniteTestUtils.waitForCondition(() -> assertPartitionsSame(accounts, 0), TimeUnit.SECONDS.toMillis(10))); + assertTrue(IgniteTestUtils.waitForCondition(() -> assertPartitionsSame(customers, 0), TimeUnit.SECONDS.toMillis(10))); } finally { super.after(); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 69f61e7a997..3936743bb3e 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -1755,7 +1755,7 @@ private CompletableFuture finishTransaction( throw new TransactionException(commit ? TX_COMMIT_ERR : TX_ROLLBACK_ERR, ex); } - TransactionResult result = (TransactionResult) txOutcome; + TransactionResult result = (TransactionResult) ((ApplyCommandResult) txOutcome).result; markFinished(txId, result.transactionState(), result.commitTimestamp()); @@ -2635,9 +2635,9 @@ private static boolean allElementsAreNull(List list) { * * * @param cmd Raft command. - * @return Raft future. + * @return Raft future or raft decorated future with command that was processed. */ - private CompletableFuture applyCmdWithExceptionHandling(Command cmd, CompletableFuture resultFuture) { + private CompletableFuture applyCmdWithExceptionHandling(Command cmd, CompletableFuture resultFuture) { applyCmdWithRetryOnSafeTimeReorderException(cmd, resultFuture); return resultFuture.exceptionally(throwable -> { @@ -2680,7 +2680,7 @@ private void applyCmdWithRetryOnSafeTimeReorderException(Command cmd, Comple resultFuture.completeExceptionally(ex); } } else { - resultFuture.complete((T) res); + resultFuture.complete((T) new ApplyCommandResult<>(cmd, res)); } }); } @@ -2744,19 +2744,19 @@ private CompletableFuture> applyUpdateCommand( return completedFuture(fut); } else { - CompletableFuture resultFuture = new CompletableFuture<>(); + CompletableFuture> resultFuture = new CompletableFuture<>(); applyCmdWithExceptionHandling(cmd, resultFuture); return resultFuture.thenCompose(res -> { - UpdateCommandResult updateCommandResult = (UpdateCommandResult) res; + UpdateCommandResult updateCommandResult = (UpdateCommandResult) res.getResult(); if (!updateCommandResult.isPrimaryReplicaMatch()) { throw new PrimaryReplicaMissException(txId, cmd.leaseStartTime(), updateCommandResult.currentLeaseStartTime()); } if (updateCommandResult.isPrimaryInPeersAndLearners()) { - return safeTime.waitFor(cmd.safeTime()).thenApply(ignored -> null); + return safeTime.waitFor(((UpdateCommand) res.getCommand()).safeTime()).thenApply(ignored -> null); } else { if (!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK)) { // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why. @@ -2880,9 +2880,9 @@ private CompletableFuture> applyUpdateAllCommand( return completedFuture(fut); } } else { - return applyCmdWithExceptionHandling(cmd, new CompletableFuture<>()) + return applyCmdWithExceptionHandling(cmd, new CompletableFuture>()) .thenCompose(res -> { - UpdateCommandResult updateCommandResult = (UpdateCommandResult) res; + UpdateCommandResult updateCommandResult = (UpdateCommandResult) res.getResult(); if (!updateCommandResult.isPrimaryReplicaMatch()) { throw new PrimaryReplicaMissException( @@ -2892,7 +2892,7 @@ private CompletableFuture> applyUpdateAllCommand( ); } if (updateCommandResult.isPrimaryInPeersAndLearners()) { - return safeTime.waitFor(cmd.safeTime()).thenApply(ignored -> null); + return safeTime.waitFor(((UpdateAllCommand) res.getCommand()).safeTime()).thenApply(ignored -> null); } else { // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why. storageUpdateHandler.handleUpdateAll( @@ -4144,4 +4144,26 @@ private static Map asTablePartitionIdStringMap(Map { + private final Command command; + private final T result; + + ApplyCommandResult(Command command, T result) { + this.command = command; + this.result = result; + } + + Command getCommand() { + return command; + } + + T getResult() { + return result; + } + } }