Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT: Producer side rate limiting #163

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
c81351a
feat: producer side rate limiting
Sep 8, 2024
e57761a
add RL based on factor in producer layer and fixes
Sep 9, 2024
58e9c4d
fix tests
Sep 16, 2024
1200509
minor cleanup and refactor
Sep 16, 2024
a1402ee
major refactor rate limiter
iosdev747 Sep 23, 2024
725d8a8
cleanup
iosdev747 Sep 23, 2024
cfac26e
bug fixes in RL wrt conversion of counter to rates
iosdev747 Sep 25, 2024
89db234
optimise and restructure RL impl
iosdev747 Oct 5, 2024
77a748b
cleanup TODOs
iosdev747 Oct 5, 2024
71ff939
add NFR simulations and RL metrics
iosdev747 Oct 12, 2024
2e5ac88
Merge remote-tracking branch 'upstream/master' into rate_limit
iosdev747 Oct 25, 2024
e800581
add RL simulation test and refactor
iosdev747 Oct 25, 2024
ab7dd60
normalize error and decouple get SF call
iosdev747 Oct 29, 2024
08e2aa6
cleanup
iosdev747 Oct 29, 2024
b00d515
cleanup
iosdev747 Oct 29, 2024
752ca74
Refactor RL interfaces, add testcases and cleanup
iosdev747 Nov 6, 2024
4cdc0ba
merge upstream/master
iosdev747 Nov 6, 2024
10b0aa3
revert test changes
iosdev747 Nov 6, 2024
df60a01
rename package ratelimiter
iosdev747 Nov 6, 2024
d4f6df9
minor refactor
iosdev747 Nov 11, 2024
9746a5b
rename RL module
iosdev747 Nov 11, 2024
399ec5a
move classes to records
iosdev747 Nov 11, 2024
00a9dd3
merging master
gaurav-ashok Nov 12, 2024
bcf595c
remove server dependency from controller
iosdev747 Nov 13, 2024
e857f8c
Merge remote-tracking branch 'origin/rate_limit' into rate_limit
iosdev747 Nov 13, 2024
a1c5ea0
add tests and decouple metrics collection
iosdev747 Nov 20, 2024
bd30f66
update RL unit tests
iosdev747 Nov 20, 2024
db4a394
merge upstream/master
iosdev747 Nov 20, 2024
f92a292
fix tests
iosdev747 Nov 20, 2024
3668174
test debug statements
iosdev747 Nov 20, 2024
4202e4e
Revert "test debug statements"
iosdev747 Nov 20, 2024
68762e0
fix flaky test
iosdev747 Nov 20, 2024
03c11fc
remove simulations from test exec
iosdev747 Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.flipkart.varadhi.cluster.messages.ClusterMessage;
import com.flipkart.varadhi.cluster.messages.ResponseMessage;
import com.flipkart.varadhi.exceptions.NotImplementedException;
import com.flipkart.varadhi.utils.JsonMapper;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
Expand Down Expand Up @@ -48,7 +47,13 @@ public CompletableFuture<Void> send(String routeName, String apiName, ClusterMes
}

public void publish(String routeName, String apiName, ClusterMessage msg) {
throw new NotImplementedException("publish not implemented");
String apiPath = getPath(routeName, apiName, RouteMethod.PUBLISH);
try {
vertxEventBus.publish(apiPath, JsonMapper.jsonSerialize(msg), deliveryOptions);
log.debug("publish({}, {}) delivered.", apiPath, msg.getId());
} catch (Exception e) {
log.error("publish({}, {}) Unexpected failure:{}", apiPath, msg.getId(), e.getMessage());
}
}

public CompletableFuture<ResponseMessage> request(String routeName, String apiName, ClusterMessage msg) {
Expand All @@ -57,7 +62,7 @@ public CompletableFuture<ResponseMessage> request(String routeName, String apiNa
try {
vertxEventBus.request(apiPath, JsonMapper.jsonSerialize(msg), deliveryOptions, ar -> {
if (ar.succeeded()) {
log.debug("request({}, {}) delivered. {}.", apiPath, msg.getId(), ar.result().body());
log.info("request({}, {}) delivered. {}.", apiPath, msg.getId(), ar.result().body());
ResponseMessage response =
JsonMapper.jsonDeserialize((String) ar.result().body(), ResponseMessage.class);
if (response.getException() != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.flipkart.varadhi.cluster;


import com.flipkart.varadhi.cluster.messages.*;
import com.flipkart.varadhi.exceptions.NotImplementedException;
import com.flipkart.varadhi.cluster.messages.ClusterMessage;
import com.flipkart.varadhi.cluster.messages.MsgHandler;
import com.flipkart.varadhi.cluster.messages.RequestHandler;
import com.flipkart.varadhi.cluster.messages.ResponseMessage;
import com.flipkart.varadhi.exceptions.VaradhiException;
import com.flipkart.varadhi.utils.JsonMapper;
import io.vertx.core.eventbus.DeliveryOptions;
Expand Down Expand Up @@ -61,7 +63,7 @@ public void requestHandler(
String apiPath = getApiPath(routeName, apiName, RouteMethod.REQUEST);
vertxEventBus.consumer(apiPath, message -> {
ClusterMessage msg = JsonMapper.jsonDeserialize((String) message.body(), ClusterMessage.class);
log.debug("Received msg via - request({}, {})", apiPath, msg.getId());
log.info("Received msg via - request({}, {})", apiPath, msg.getId());
try {
handler.handle(msg).thenAccept(response -> message.reply(
JsonMapper.jsonSerialize(response),
Expand Down Expand Up @@ -90,7 +92,18 @@ public void requestHandler(
}

public void publishHandler(String routeName, String apiName, MsgHandler handler) {
throw new NotImplementedException("handlePublish not implemented");
// TODO(rl): not sure if publishHandler should do anything different as compared to sendHandler except sending reply
String apiPath = getApiPath(routeName, apiName, RouteMethod.PUBLISH);
vertxEventBus.consumer(apiPath, message -> {
ClusterMessage msg = JsonMapper.jsonDeserialize((String) message.body(), ClusterMessage.class);
log.debug("Received msg via - publish({}, {})", apiPath, msg.getId());
try {
// this is async invocation.
handler.handle(msg);
} catch (Exception e) {
log.error("publish handler.handle({}) Unhandled exception: {}", message.body(), e.getMessage());
}
});
}

private String getApiPath(String routeName, String apiName, RouteMethod method) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.flipkart.varadhi.qos;

public interface RateLimiter {

// assumes 1 QPS
// for batch request (to be considered as multi QPS), consider creating a new API
boolean isAllowed(Double value);
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.flipkart.varadhi.qos;

import com.google.common.util.concurrent.AtomicDouble;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TopicRateLimiter implements RateLimiter {

@Getter
private final String topic;
@Getter
private final String name;
private volatile double suppressionFactor;
private AtomicDouble lastObserved;
private AtomicDouble currentObserved;
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved

public TopicRateLimiter(String topic, String name) {
this.topic = topic;
this.name = name;
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved
this.suppressionFactor = 0;
lastObserved = new AtomicDouble(0.0);
currentObserved = new AtomicDouble(0.0);
}

@Override
public boolean isAllowed(Double value) {
currentObserved.addAndGet(value);
if(suppressionFactor == 0) {
return true;
}
return currentObserved.get() <= lastObserved.get()*(1-suppressionFactor);
Copy link
Contributor

@gaurav-ashok gaurav-ashok Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this formula?
i would have expected isAllowed = !(random.nextDouble(0, 1) < spuppressionFactor).

Copy link
Contributor

@gaurav-ashok gaurav-ashok Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current formula will block everything once your condition becomes true. I am also not sure why lastObserved data point is being used here!

}

public void setSuppressionFactor(double suppressionFactor) {
lastObserved.set(currentObserved.get());
// reset current observed
currentObserved.set(0.0);
this.suppressionFactor = suppressionFactor;
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.flipkart.varadhi.qos.client;

import com.flipkart.varadhi.cluster.MessageExchange;
import com.flipkart.varadhi.qos.RateLimiter;
import com.flipkart.varadhi.qos.TopicRateLimiter;
import com.flipkart.varadhi.utils.HostUtils;

import com.flipkart.varadhi.verticles.webserver.TrafficAggregator;
import java.net.UnknownHostException;

import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
public class RateLimiterService {

/**
* Map of topic to rate limiters (different rate limiters for QPS, Throughput etc)
*/
Map<String, List<RateLimiter>> topicRateLimiters;
TrafficAggregator trafficAggregator;


public RateLimiterService(MessageExchange exchange, int frequency, boolean requireHostName)
throws UnknownHostException {
topicRateLimiters = new HashMap<>();
trafficAggregator = new TrafficAggregator(exchange, HostUtils.getHostNameOrAddress(requireHostName), frequency, this);
}

public List<RateLimiter> getRateLimiter(String topic) {
if (!topicRateLimiters.containsKey(topic)) {
topicRateLimiters.put(
topic,
List.of(
new TopicRateLimiter(topic, "throughput_check"),
new TopicRateLimiter(topic, "qps_check")
)
);
}
return topicRateLimiters.get(topic);
}

public void updateSuppressionFactor(String topic, String type, Double suppressionFactor) {
log.info("Updating suppression factor for topic: {}", topic);
getRateLimiter(topic).forEach(rl -> {
if(rl instanceof TopicRateLimiter trl) {
if(trl.getTopic().equals(topic) && trl.getName().equals(type)) {
log.info("Setting SF for topic: {}, factor: {}, rl: {}", topic, suppressionFactor, trl.getName());
trl.setSuppressionFactor(suppressionFactor);
}
}
});
}

public boolean isAllowed(String topic, Double throughput) {
trafficAggregator.addTopicUsage(topic, throughput.longValue(), 1);
return getRateLimiter(topic).stream().allMatch(rl -> {
boolean allowed = rl.isAllowed(throughput);
return allowed;
});
}



}
59 changes: 59 additions & 0 deletions server/src/main/java/com/flipkart/varadhi/qos/entity/LoadInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.flipkart.varadhi.qos.entity;


import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

import java.util.Map;

@Getter
@Setter
@AllArgsConstructor
public class LoadInfo implements Comparable<LoadInfo> {
// current millis
private String clientId;
private long from;
private long to;
private Map<String, TrafficData> topicUsageMap; // topic to incoming traffic map
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved

@Override
public String toString() {
return "\nLoadInfo{" +
"clientId=" + clientId +
"from=" + from +
", to=" + to +
", topicUsageMap=" + topicUsageMap +
"}";
}

@Override
public int compareTo(LoadInfo loadInfo) {
return compare(this, loadInfo);
}

public static int compare(@NotNull LoadInfo x, @NotNull LoadInfo y) {

if(x.clientId != null && y.clientId != null && x.clientId.compareTo(y.clientId) == 0) {
return 0;
}

long x_from = x.from;
long x_to = x.to;
long y_from = y.from;
long y_to = y.to;

int c1 = Long.compare(x_from, y_from);
if (c1 != 0) {
return c1;
}
int c2 = Long.compare(x_to, y_to);
if (c2 != 0) {
return c2;
}
// we never says that two LoadInfo objects are equal unless their UUID matches
return 1;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.flipkart.varadhi.qos.entity;

import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.databind.util.Converter;
import lombok.Getter;
import lombok.Setter;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Suppression data for rate limiting. Stores suppression factor for each topic.
*/
@Getter
@Setter
public class SuppressionData<T> {
@JsonDeserialize(converter = HashMapTransformer.class)
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved
Map<String, T> suppressionFactor;

public SuppressionData() {
this.suppressionFactor = new ConcurrentHashMap<>();
}

private static class HashMapTransformer implements Converter<Map<String, SuppressionFactor>, Map<String, SuppressionFactor>> {

@Override
public Map<String, SuppressionFactor> convert(Map<String, SuppressionFactor> map) {
return new HashMap<>(map);
}

@Override
public JavaType getInputType(TypeFactory typeFactory) {
return typeFactory.constructMapType(Map.class, String.class, SuppressionFactor.class);
}

@Override
public JavaType getOutputType(TypeFactory typeFactory) {
return typeFactory.constructMapType(Map.class, String.class, SuppressionFactor.class);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.flipkart.varadhi.qos.entity;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
@AllArgsConstructor
public class SuppressionFactor {
Double throughputFactor;
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved
Double qpsFactor;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.flipkart.varadhi.qos.entity;

import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;

/**
* Single topic's traffic data
*/
@Getter
@Setter
@Builder
public class TrafficData implements Comparable<TrafficData> {
// TODO(rl): make them adders
Long throughputIn;
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved
Long rateIn;

@Override
public String toString() {
return "\nTrafficData{" +
"throughputIn=" + throughputIn +
", rateIn=" + rateIn +
"}";
}

@Override
public int compareTo(TrafficData trafficData) {
return compare(this, trafficData);
}

public static int compare(@NotNull TrafficData x, @NotNull TrafficData y) {
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved
if (x.throughputIn < y.throughputIn) {
return -1;
}
if (x.throughputIn > y.throughputIn) {
return 1;
}
if (x.rateIn < y.rateIn) {
return -1;
}
return 1;
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading
Loading