Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT: Producer side rate limiting #163

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
c81351a
feat: producer side rate limiting
Sep 8, 2024
e57761a
add RL based on factor in producer layer and fixes
Sep 9, 2024
58e9c4d
fix tests
Sep 16, 2024
1200509
minor cleanup and refactor
Sep 16, 2024
a1402ee
major refactor rate limiter
iosdev747 Sep 23, 2024
725d8a8
cleanup
iosdev747 Sep 23, 2024
cfac26e
bug fixes in RL wrt conversion of counter to rates
iosdev747 Sep 25, 2024
89db234
optimise and restructure RL impl
iosdev747 Oct 5, 2024
77a748b
cleanup TODOs
iosdev747 Oct 5, 2024
71ff939
add NFR simulations and RL metrics
iosdev747 Oct 12, 2024
2e5ac88
Merge remote-tracking branch 'upstream/master' into rate_limit
iosdev747 Oct 25, 2024
e800581
add RL simulation test and refactor
iosdev747 Oct 25, 2024
ab7dd60
normalize error and decouple get SF call
iosdev747 Oct 29, 2024
08e2aa6
cleanup
iosdev747 Oct 29, 2024
b00d515
cleanup
iosdev747 Oct 29, 2024
752ca74
Refactor RL interfaces, add testcases and cleanup
iosdev747 Nov 6, 2024
4cdc0ba
merge upstream/master
iosdev747 Nov 6, 2024
10b0aa3
revert test changes
iosdev747 Nov 6, 2024
df60a01
rename package ratelimiter
iosdev747 Nov 6, 2024
d4f6df9
minor refactor
iosdev747 Nov 11, 2024
9746a5b
rename RL module
iosdev747 Nov 11, 2024
399ec5a
move classes to records
iosdev747 Nov 11, 2024
00a9dd3
merging master
gaurav-ashok Nov 12, 2024
bcf595c
remove server dependency from controller
iosdev747 Nov 13, 2024
e857f8c
Merge remote-tracking branch 'origin/rate_limit' into rate_limit
iosdev747 Nov 13, 2024
a1c5ea0
add tests and decouple metrics collection
iosdev747 Nov 20, 2024
bd30f66
update RL unit tests
iosdev747 Nov 20, 2024
db4a394
merge upstream/master
iosdev747 Nov 20, 2024
f92a292
fix tests
iosdev747 Nov 20, 2024
3668174
test debug statements
iosdev747 Nov 20, 2024
4202e4e
Revert "test debug statements"
iosdev747 Nov 20, 2024
68762e0
fix flaky test
iosdev747 Nov 20, 2024
03c11fc
remove simulations from test exec
iosdev747 Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .run/Dev, start pulsar and zk.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
</deployment>
<method v="2" />
</configuration>
</component>
</component>
35 changes: 35 additions & 0 deletions common/src/main/java/com/flipkart/varadhi/utils/FutureUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.flipkart.varadhi.utils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

/**
* This class is aimed at simplifying work with {@code CompletableFuture}.
*/
public class FutureUtil {

/**
* Return a future that represents the completion of the futures in the provided Collection.
*
* @param futures futures to wait for
* @return a new CompletableFuture that is completed when all the given CompletableFutures complete
*/
public static CompletableFuture<Void> waitForAll(Collection<? extends CompletableFuture<?>> futures) {
if (futures == null || futures.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

public static <T> CompletableFuture<List<T>> waitForAll(Stream<CompletableFuture<List<T>>> futures) {
return futures.reduce(CompletableFuture.completedFuture(new ArrayList<>()),
(pre, curr) -> pre.thenCompose(preV -> curr.thenApply(currV -> {
preV.addAll(currV);
return preV;
})));
}

}
3 changes: 3 additions & 0 deletions controller/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ dependencies {
api(project(":entities"))
api(project(":spi"))
api(project(":core"))
api(project(':ratelimiter'))

implementation(project(":common"))
implementation("io.vertx:vertx-micrometer-metrics")
implementation('com.fasterxml.jackson.core:jackson-databind')
implementation("org.apache.commons:commons-lang3")
testImplementation(testFixtures(project(":entities")))
testImplementation(testFixtures(project(":common")))
testImplementation("org.awaitility:awaitility:4.2.1")
testImplementation(project(':server'))
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package com.flipkart.varadhi.controller;

import com.flipkart.varadhi.core.capacity.TopicCapacityService;
import com.flipkart.varadhi.qos.DistributedRateLimiter;
import com.flipkart.varadhi.qos.entity.*;

import java.time.Clock;
import java.util.ArrayList;
import java.util.Deque;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableDouble;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class DistributedRateLimiterImpl implements DistributedRateLimiter {
private Map<String, ClientHistory> topicTrafficDataMap; // topic to client load info
private int windowSize;
private TopicCapacityService topicCapacityService;
private Clock clock;
private final LoadPrediction loadPredictor;

public DistributedRateLimiterImpl(int windowSize, TopicCapacityService topicCapacityService, Clock clock) {
this.topicTrafficDataMap = new ConcurrentHashMap<>();
this.windowSize = windowSize;
this.topicCapacityService = topicCapacityService;
this.clock = clock;
this.loadPredictor = new LoadPrediction() {
@Override
public List<TopicLoadInfo> predictLoad(Map<String, Deque<TopicLoadInfo>> records) {
List<TopicLoadInfo> recentRecords = new ArrayList<>();
records.forEach((clientId, history) -> {
if(!history.isEmpty()) {
TopicLoadInfo topicLoadInfo = history.peekLast();
if(topicLoadInfo != null) {
if(!isExpired(topicLoadInfo.getTo())) {
recentRecords.add(topicLoadInfo);
}
}
}
});
return recentRecords;
}
};
}

// Adds throughput for current client and returns the updated suppression factor for the topic
// TODO(rl): add NFR tests
// TODO(rl): cache for specific clientId and maintain a running agg.
public SuppressionFactor addTrafficData(String clientId, TopicLoadInfo topicLoadInfo) {
// check if clientId is already present in the list
String topic = topicLoadInfo.getTopicLoad().getTopic();
MutableDouble actualThroughout = new MutableDouble(0.0);
topicTrafficDataMap.compute(topic, (k, v) -> {
if (v == null) {
v = new ClientHistory(windowSize, loadPredictor);
}
v.add(clientId, topicLoadInfo);
actualThroughout.setValue(getThroughput(v));
return v;
});
int throughputBPS = topicCapacityService.getThroughputLimit(topic);
return new SuppressionFactor(calculateSuppressionFactor(throughputBPS, actualThroughout.getValue()));
}

// TODO(rl): remove client from here

private Double calculateSuppressionFactor(double limit, double actual) {
return Math.max(0, 1.0 - (limit / actual));
}

private Double getThroughput(ClientHistory clientsHistory) {
double totalThroughput = 0.0;
List<TopicLoadInfo> records = clientsHistory.getTotalLoad();
for(TopicLoadInfo record: records){
double windowSizeInSeconds = (double) (record.getTo() - record.getFrom()) / 1000;
totalThroughput += record.getTopicLoad().getBytesIn() / windowSizeInSeconds;
}
return totalThroughput;
}

/**
* check if record is older than windowSize
* @param time record time
* @return true if record is older than windowSize
*/
private boolean isExpired(long time) {
return (clock.millis() - windowSize * 1000L) > time;
}

@Override
public SuppressionData addTrafficData(ClientLoadInfo info) {
SuppressionData suppressionData = new SuppressionData();
info.getTopicUsageList().forEach((trafficData) -> {
SuppressionFactor suppressionFactor = addTrafficData(
info.getClientId(),
new TopicLoadInfo(info.getClientId(), info.getFrom(), info.getTo(), trafficData)
);
log.debug("Topic: {}, SF thr-pt: {}", trafficData.getTopic(), suppressionFactor.getThroughputFactor());
suppressionData.getSuppressionFactor().put(trafficData.getTopic(), suppressionFactor);
});
return suppressionData;
}
}
Loading
Loading