diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java index 9aec93cf1ffb3..392ec0d3ff46f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java @@ -20,6 +20,10 @@ import com.google.common.collect.Sets; import io.prometheus.client.Summary; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass; @@ -45,11 +49,6 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.TimeUnit; - // The tests implement a set of producer/consumer operations on a set of topics. // [A thread is started for each producer, and each consumer in the test.] @@ -57,6 +56,7 @@ // After sending/receiving all the messages, traffic usage statistics, and Prometheus-metrics // are verified on the RGs. @Slf4j +@Test(groups = "flaky") public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase { @BeforeClass @Override @@ -119,9 +119,9 @@ private class ProduceMessages implements Runnable { private final int numMesgsToProduce; private final String myProduceTopic; - private int sentNumBytes = 0; - private int sentNumMsgs = 0; - private int numExceptions = 0; + private volatile int sentNumBytes = 0; + private volatile int sentNumMsgs = 0; + private volatile int numExceptions = 0; ProduceMessages(int prodId, int nMesgs, String[] topics) { producerId = prodId; @@ -202,9 +202,9 @@ private class ConsumeMessages implements Runnable { private final int recvTimeoutMilliSecs = 1000; private final int ackTimeoutMilliSecs = 1100; // has to be more than 1 second - private int recvdNumBytes = 0; - private int recvdNumMsgs = 0; - private int numExceptions = 0; + private volatile int recvdNumBytes = 0; + private volatile int recvdNumMsgs = 0; + private volatile int numExceptions = 0; private volatile boolean allMessagesReceived = false; private volatile boolean consumerIsReady = false; @@ -494,15 +494,15 @@ private void testProduceConsumeUsageOnRG(String[] topicStrings) throws Exception while (numConsumersDone < NUM_CONSUMERS) { for (int ix = 0; ix < NUM_CONSUMERS; ix++) { if (!joinedConsumers[ix]) { + consThr[ix].thread.join(); + joinedConsumers[ix] = true; + log.debug("Joined consumer={}", ix); + recvdBytes = consThr[ix].consumer.getNumBytesRecvd(); recvdMsgs = consThr[ix].consumer.getNumMessagesRecvd(); numConsumerExceptions += consThr[ix].consumer.getNumExceptions(); log.debug("Consumer={} received {} mesgs and {} bytes", ix, recvdMsgs, recvdBytes); - consThr[ix].thread.join(); - joinedConsumers[ix] = true; - log.debug("Joined consumer={}", ix); - recvdNumBytes += recvdBytes; recvdNumMsgs += recvdMsgs; numConsumersDone++;