Skip to content

Commit

Permalink
IGNITE-23086 Fix ItTxDistributedTestThreeNodesThreeReplicas.testDelet…
Browse files Browse the repository at this point in the history
…eUpsertAllRollback (#4302)
  • Loading branch information
sanpwc authored Aug 29, 2024
1 parent 3c43871 commit e2a7921
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ protected int replicas() {
@AfterEach
public void after() throws Exception {
try {
assertTrue(IgniteTestUtils.waitForCondition(() -> assertPartitionsSame(accounts, 0), TimeUnit.SECONDS.toMillis(5)));
assertTrue(IgniteTestUtils.waitForCondition(() -> assertPartitionsSame(customers, 0), TimeUnit.SECONDS.toMillis(5)));
assertTrue(IgniteTestUtils.waitForCondition(() -> assertPartitionsSame(accounts, 0), TimeUnit.SECONDS.toMillis(10)));
assertTrue(IgniteTestUtils.waitForCondition(() -> assertPartitionsSame(customers, 0), TimeUnit.SECONDS.toMillis(10)));
} finally {
super.after();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1755,7 +1755,7 @@ private CompletableFuture<TransactionResult> finishTransaction(
throw new TransactionException(commit ? TX_COMMIT_ERR : TX_ROLLBACK_ERR, ex);
}

TransactionResult result = (TransactionResult) txOutcome;
TransactionResult result = (TransactionResult) ((ApplyCommandResult) txOutcome).result;

markFinished(txId, result.transactionState(), result.commitTimestamp());

Expand Down Expand Up @@ -2635,9 +2635,9 @@ private static <T> boolean allElementsAreNull(List<T> list) {
* </ul>
*
* @param cmd Raft command.
* @return Raft future.
* @return Raft future or raft decorated future with command that was processed.
*/
private CompletableFuture<Object> applyCmdWithExceptionHandling(Command cmd, CompletableFuture<Object> resultFuture) {
private <T> CompletableFuture<T> applyCmdWithExceptionHandling(Command cmd, CompletableFuture<T> resultFuture) {
applyCmdWithRetryOnSafeTimeReorderException(cmd, resultFuture);

return resultFuture.exceptionally(throwable -> {
Expand Down Expand Up @@ -2680,7 +2680,7 @@ private <T> void applyCmdWithRetryOnSafeTimeReorderException(Command cmd, Comple
resultFuture.completeExceptionally(ex);
}
} else {
resultFuture.complete((T) res);
resultFuture.complete((T) new ApplyCommandResult<>(cmd, res));
}
});
}
Expand Down Expand Up @@ -2744,19 +2744,19 @@ private CompletableFuture<CompletableFuture<?>> applyUpdateCommand(

return completedFuture(fut);
} else {
CompletableFuture<Object> resultFuture = new CompletableFuture<>();
CompletableFuture<ApplyCommandResult<Object>> resultFuture = new CompletableFuture<>();

applyCmdWithExceptionHandling(cmd, resultFuture);

return resultFuture.thenCompose(res -> {
UpdateCommandResult updateCommandResult = (UpdateCommandResult) res;
UpdateCommandResult updateCommandResult = (UpdateCommandResult) res.getResult();

if (!updateCommandResult.isPrimaryReplicaMatch()) {
throw new PrimaryReplicaMissException(txId, cmd.leaseStartTime(), updateCommandResult.currentLeaseStartTime());
}

if (updateCommandResult.isPrimaryInPeersAndLearners()) {
return safeTime.waitFor(cmd.safeTime()).thenApply(ignored -> null);
return safeTime.waitFor(((UpdateCommand) res.getCommand()).safeTime()).thenApply(ignored -> null);
} else {
if (!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK)) {
// We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
Expand Down Expand Up @@ -2880,9 +2880,9 @@ private CompletableFuture<CompletableFuture<?>> applyUpdateAllCommand(
return completedFuture(fut);
}
} else {
return applyCmdWithExceptionHandling(cmd, new CompletableFuture<>())
return applyCmdWithExceptionHandling(cmd, new CompletableFuture<ApplyCommandResult<Object>>())
.thenCompose(res -> {
UpdateCommandResult updateCommandResult = (UpdateCommandResult) res;
UpdateCommandResult updateCommandResult = (UpdateCommandResult) res.getResult();

if (!updateCommandResult.isPrimaryReplicaMatch()) {
throw new PrimaryReplicaMissException(
Expand All @@ -2892,7 +2892,7 @@ private CompletableFuture<CompletableFuture<?>> applyUpdateAllCommand(
);
}
if (updateCommandResult.isPrimaryInPeersAndLearners()) {
return safeTime.waitFor(cmd.safeTime()).thenApply(ignored -> null);
return safeTime.waitFor(((UpdateAllCommand) res.getCommand()).safeTime()).thenApply(ignored -> null);
} else {
// We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
storageUpdateHandler.handleUpdateAll(
Expand Down Expand Up @@ -4144,4 +4144,26 @@ private static Map<TablePartitionId, String> asTablePartitionIdStringMap(Map<Tab

return result;
}

/**
* Wrapper for the update(All)Command processing result that besides result itself stores actual command that was processed. It helps to
* manage commands substitutions on SafeTimeReorderException where cloned command with adjusted safeTime is sent.
*/
private static class ApplyCommandResult<T> {
private final Command command;
private final T result;

ApplyCommandResult(Command command, T result) {
this.command = command;
this.result = result;
}

Command getCommand() {
return command;
}

T getResult() {
return result;
}
}
}

0 comments on commit e2a7921

Please sign in to comment.