From f7caa4bd16bd52e2aa26885d5ba344e44359ddea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Thu, 31 Oct 2024 18:03:01 +0100 Subject: [PATCH] GH-389: Fix MockReceiverTest#consumerMethods failure with reactor-core 3.5 Fixes: #389 Due to the change between `3.4.x` and `3.5.x` of `reactor-core`, the test would fail. The reason is that since `3.5` the prefetching strategy is less eager and the consumption would be paused due to lack of downstream requests. The assertion was to strict as in such scenarios not only the intended partition is paused but all of them. This relaxation makes the test pass against future reactor-core versions. --- .../reactor/kafka/receiver/internals/MockReceiverTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/test/java/reactor/kafka/receiver/internals/MockReceiverTest.java b/src/test/java/reactor/kafka/receiver/internals/MockReceiverTest.java index 525cab01..0c93c6fd 100644 --- a/src/test/java/reactor/kafka/receiver/internals/MockReceiverTest.java +++ b/src/test/java/reactor/kafka/receiver/internals/MockReceiverTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -70,6 +70,7 @@ import java.util.function.Function; import java.util.regex.Pattern; +import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -1188,7 +1189,8 @@ public void consumerMethods() throws Exception { testConsumerMethod(c -> { Collection partitions = Collections.singleton(new TopicPartition(topic, 1)); c.pause(partitions); - assertEquals(partitions, c.paused()); + // Due to backpressure other partitions can be paused at the same moment in time + assertThat(c.paused()).containsAll(partitions); c.resume(partitions); }); testConsumerMethod(c -> {