diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index aa53a93da5c4f..54b8993784e29 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -34,6 +34,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import lombok.AllArgsConstructor; +import lombok.Data; import lombok.Getter; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; @@ -199,15 +201,31 @@ protected void disableReplicatorRead() { this.cursor.setInactive(); } + @Data + @AllArgsConstructor + private static class AvailablePermits { + private int messages; + private long bytes; + + /** + * messages, bytes + * 0, O: Producer queue is full, no permits. + * -1, -1: Rate Limiter reaches limit. + * >0, >0: available permits for read entries. + */ + public boolean isExceeded() { + return messages == -1 && bytes == -1; + } + + public boolean isReadable() { + return messages > 0 && bytes > 0; + } + } + /** * Calculate available permits for read entries. - * - * @return - * 0: Producer queue is full, no permits. - * -1: Rate Limiter reaches limit. - * >0: available permits for read entries. */ - private int getAvailablePermits() { + private AvailablePermits getAvailablePermits() { int availablePermits = producerQueueSize - PENDING_MESSAGES_UPDATER.get(this); // return 0, if Producer queue is full, it will pause read entries. @@ -216,15 +234,18 @@ private int getAvailablePermits() { log.debug("[{}] Producer queue is full, availablePermits: {}, pause reading", replicatorId, availablePermits); } - return 0; + return new AvailablePermits(0, 0); } + long availablePermitsOnMsg = -1; + long availablePermitsOnByte = -1; + // handle rate limit if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) { DispatchRateLimiter rateLimiter = dispatchRateLimiter.get(); // if dispatch-rate is in msg then read only msg according to available permit - long availablePermitsOnMsg = rateLimiter.getAvailableDispatchRateLimitOnMsg(); - long availablePermitsOnByte = rateLimiter.getAvailableDispatchRateLimitOnByte(); + availablePermitsOnMsg = rateLimiter.getAvailableDispatchRateLimitOnMsg(); + availablePermitsOnByte = rateLimiter.getAvailableDispatchRateLimitOnByte(); // no permits from rate limit if (availablePermitsOnByte == 0 || availablePermitsOnMsg == 0) { if (log.isDebugEnabled()) { @@ -235,14 +256,18 @@ private int getAvailablePermits() { rateLimiter.getDispatchRateOnByte(), MESSAGE_RATE_BACKOFF_MS); } - return -1; - } - if (availablePermitsOnMsg > 0) { - availablePermits = Math.min(availablePermits, (int) availablePermitsOnMsg); + return new AvailablePermits(-1, -1); } } - return availablePermits; + availablePermitsOnMsg = + availablePermitsOnMsg == -1 ? availablePermits : Math.min(availablePermits, availablePermitsOnMsg); + availablePermitsOnMsg = Math.min(availablePermitsOnMsg, readBatchSize); + + availablePermitsOnByte = + availablePermitsOnByte == -1 ? readMaxSizeBytes : Math.min(readMaxSizeBytes, availablePermitsOnByte); + + return new AvailablePermits((int) availablePermitsOnMsg, availablePermitsOnByte); } protected void readMoreEntries() { @@ -250,10 +275,10 @@ protected void readMoreEntries() { log.info("[{}] Skip the reading due to new detected schema", replicatorId); return; } - int availablePermits = getAvailablePermits(); - - if (availablePermits > 0) { - int messagesToRead = Math.min(availablePermits, readBatchSize); + AvailablePermits availablePermits = getAvailablePermits(); + if (availablePermits.isReadable()) { + int messagesToRead = availablePermits.getMessages(); + long bytesToRead = availablePermits.getBytes(); if (!isWritable()) { if (log.isDebugEnabled()) { log.debug("[{}] Throttling replication traffic because producer is not writable", replicatorId); @@ -262,23 +287,21 @@ protected void readMoreEntries() { messagesToRead = 1; } - // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException - messagesToRead = Math.max(messagesToRead, 1); - // Schedule read if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) { if (log.isDebugEnabled()) { - log.debug("[{}] Schedule read of {} messages", replicatorId, messagesToRead); + log.debug("[{}] Schedule read of {} messages or {} bytes", replicatorId, messagesToRead, + bytesToRead); } - cursor.asyncReadEntriesOrWait(messagesToRead, readMaxSizeBytes, this, + cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, null, topic.getMaxReadPosition()); } else { if (log.isDebugEnabled()) { - log.debug("[{}] Not scheduling read due to pending read. Messages To Read {}", - replicatorId, messagesToRead); + log.debug("[{}] Not scheduling read due to pending read. Messages To Read {}, Bytes To Read {}", + replicatorId, messagesToRead, bytesToRead); } } - } else if (availablePermits == -1) { + } else if (availablePermits.isExceeded()) { // no permits from rate limit topic.getBrokerService().executor().schedule( () -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java index 747ef3b7f5ce8..90df16360614d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -548,5 +549,64 @@ public void testReplicatorRateLimiterMessageReceivedAllMessages() throws Excepti producer.close(); } + @Test + public void testReplicatorRateLimiterByBytes() throws Exception { + final String namespace = "pulsar/replicatormsg-" + System.currentTimeMillis(); + final String topicName = "persistent://" + namespace + "/RateLimiterByBytes"; + + admin1.namespaces().createNamespace(namespace); + // 0. set 2 clusters, there will be 1 replicator in each topic + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + + final int byteRate = 400; + final int payloadSize = 100; + DispatchRate dispatchRate = DispatchRate.builder() + .dispatchThrottlingRateInMsg(-1) + .dispatchThrottlingRateInByte(byteRate) + .ratePeriodInSecond(360) + .build(); + admin1.namespaces().setReplicatorDispatchRate(namespace, dispatchRate); + + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).build(); + @Cleanup + Producer producer = client1.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get(); + + Awaitility.await() + .untilAsserted(() -> assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent())); + assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), byteRate); + + @Cleanup + PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()) + .build(); + final AtomicInteger totalReceived = new AtomicInteger(0); + + @Cleanup + Consumer ignored = client2.newConsumer().topic(topicName).subscriptionName("sub2-in-cluster2") + .messageListener((c1, msg) -> { + Assert.assertNotNull(msg, "Message cannot be null"); + String receivedMessage = new String(msg.getData()); + log.debug("Received message [{}] in the listener", receivedMessage); + totalReceived.incrementAndGet(); + }).subscribe(); + + // The total bytes is 5 times the rate limit value. + int numMessages = byteRate / payloadSize * 5; + for (int i = 0; i < numMessages * payloadSize; i++) { + producer.send(new byte[payloadSize]); + } + + Awaitility.await().pollDelay(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + // The rate limit occurs in the next reading cycle, so a value fault tolerance needs to be added. + assertThat(totalReceived.get()).isLessThan((byteRate / payloadSize) + 2); + }); + } + private static final Logger log = LoggerFactory.getLogger(ReplicatorRateLimiterTest.class); }