Skip to content

Commit

Permalink
[Fix][broker] Limit replication rate based on bytes (apache#22674)
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece authored Jun 28, 2024
1 parent 4e535cb commit 7f4c0c5
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
Expand Down Expand Up @@ -199,15 +201,31 @@ protected void disableReplicatorRead() {
this.cursor.setInactive();
}

@Data
@AllArgsConstructor
private static class AvailablePermits {
private int messages;
private long bytes;

/**
* messages, bytes
* 0, O: Producer queue is full, no permits.
* -1, -1: Rate Limiter reaches limit.
* >0, >0: available permits for read entries.
*/
public boolean isExceeded() {
return messages == -1 && bytes == -1;
}

public boolean isReadable() {
return messages > 0 && bytes > 0;
}
}

/**
* Calculate available permits for read entries.
*
* @return
* 0: Producer queue is full, no permits.
* -1: Rate Limiter reaches limit.
* >0: available permits for read entries.
*/
private int getAvailablePermits() {
private AvailablePermits getAvailablePermits() {
int availablePermits = producerQueueSize - PENDING_MESSAGES_UPDATER.get(this);

// return 0, if Producer queue is full, it will pause read entries.
Expand All @@ -216,15 +234,18 @@ private int getAvailablePermits() {
log.debug("[{}] Producer queue is full, availablePermits: {}, pause reading",
replicatorId, availablePermits);
}
return 0;
return new AvailablePermits(0, 0);
}

long availablePermitsOnMsg = -1;
long availablePermitsOnByte = -1;

// handle rate limit
if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
DispatchRateLimiter rateLimiter = dispatchRateLimiter.get();
// if dispatch-rate is in msg then read only msg according to available permit
long availablePermitsOnMsg = rateLimiter.getAvailableDispatchRateLimitOnMsg();
long availablePermitsOnByte = rateLimiter.getAvailableDispatchRateLimitOnByte();
availablePermitsOnMsg = rateLimiter.getAvailableDispatchRateLimitOnMsg();
availablePermitsOnByte = rateLimiter.getAvailableDispatchRateLimitOnByte();
// no permits from rate limit
if (availablePermitsOnByte == 0 || availablePermitsOnMsg == 0) {
if (log.isDebugEnabled()) {
Expand All @@ -235,25 +256,29 @@ private int getAvailablePermits() {
rateLimiter.getDispatchRateOnByte(),
MESSAGE_RATE_BACKOFF_MS);
}
return -1;
}
if (availablePermitsOnMsg > 0) {
availablePermits = Math.min(availablePermits, (int) availablePermitsOnMsg);
return new AvailablePermits(-1, -1);
}
}

return availablePermits;
availablePermitsOnMsg =
availablePermitsOnMsg == -1 ? availablePermits : Math.min(availablePermits, availablePermitsOnMsg);
availablePermitsOnMsg = Math.min(availablePermitsOnMsg, readBatchSize);

availablePermitsOnByte =
availablePermitsOnByte == -1 ? readMaxSizeBytes : Math.min(readMaxSizeBytes, availablePermitsOnByte);

return new AvailablePermits((int) availablePermitsOnMsg, availablePermitsOnByte);
}

protected void readMoreEntries() {
if (fetchSchemaInProgress) {
log.info("[{}] Skip the reading due to new detected schema", replicatorId);
return;
}
int availablePermits = getAvailablePermits();

if (availablePermits > 0) {
int messagesToRead = Math.min(availablePermits, readBatchSize);
AvailablePermits availablePermits = getAvailablePermits();
if (availablePermits.isReadable()) {
int messagesToRead = availablePermits.getMessages();
long bytesToRead = availablePermits.getBytes();
if (!isWritable()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Throttling replication traffic because producer is not writable", replicatorId);
Expand All @@ -262,23 +287,21 @@ protected void readMoreEntries() {
messagesToRead = 1;
}

// If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
messagesToRead = Math.max(messagesToRead, 1);

// Schedule read
if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages", replicatorId, messagesToRead);
log.debug("[{}] Schedule read of {} messages or {} bytes", replicatorId, messagesToRead,
bytesToRead);
}
cursor.asyncReadEntriesOrWait(messagesToRead, readMaxSizeBytes, this,
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,
null, topic.getMaxReadPosition());
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Not scheduling read due to pending read. Messages To Read {}",
replicatorId, messagesToRead);
log.debug("[{}] Not scheduling read due to pending read. Messages To Read {}, Bytes To Read {}",
replicatorId, messagesToRead, bytesToRead);
}
}
} else if (availablePermits == -1) {
} else if (availablePermits.isExceeded()) {
// no permits from rate limit
topic.getBrokerService().executor().schedule(
() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -548,5 +549,64 @@ public void testReplicatorRateLimiterMessageReceivedAllMessages() throws Excepti
producer.close();
}

@Test
public void testReplicatorRateLimiterByBytes() throws Exception {
final String namespace = "pulsar/replicatormsg-" + System.currentTimeMillis();
final String topicName = "persistent://" + namespace + "/RateLimiterByBytes";

admin1.namespaces().createNamespace(namespace);
// 0. set 2 clusters, there will be 1 replicator in each topic
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));

final int byteRate = 400;
final int payloadSize = 100;
DispatchRate dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(-1)
.dispatchThrottlingRateInByte(byteRate)
.ratePeriodInSecond(360)
.build();
admin1.namespaces().setReplicatorDispatchRate(namespace, dispatchRate);

@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).build();
@Cleanup
Producer<byte[]> producer = client1.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();

Awaitility.await()
.untilAsserted(() -> assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent()));
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), byteRate);

@Cleanup
PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString())
.build();
final AtomicInteger totalReceived = new AtomicInteger(0);

@Cleanup
Consumer<byte[]> ignored = client2.newConsumer().topic(topicName).subscriptionName("sub2-in-cluster2")
.messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in the listener", receivedMessage);
totalReceived.incrementAndGet();
}).subscribe();

// The total bytes is 5 times the rate limit value.
int numMessages = byteRate / payloadSize * 5;
for (int i = 0; i < numMessages * payloadSize; i++) {
producer.send(new byte[payloadSize]);
}

Awaitility.await().pollDelay(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
// The rate limit occurs in the next reading cycle, so a value fault tolerance needs to be added.
assertThat(totalReceived.get()).isLessThan((byteRate / payloadSize) + 2);
});
}

private static final Logger log = LoggerFactory.getLogger(ReplicatorRateLimiterTest.class);
}

0 comments on commit 7f4c0c5

Please sign in to comment.