Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT: Producer side rate limiting #163

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
c81351a
feat: producer side rate limiting
Sep 8, 2024
e57761a
add RL based on factor in producer layer and fixes
Sep 9, 2024
58e9c4d
fix tests
Sep 16, 2024
1200509
minor cleanup and refactor
Sep 16, 2024
a1402ee
major refactor rate limiter
iosdev747 Sep 23, 2024
725d8a8
cleanup
iosdev747 Sep 23, 2024
cfac26e
bug fixes in RL wrt conversion of counter to rates
iosdev747 Sep 25, 2024
89db234
optimise and restructure RL impl
iosdev747 Oct 5, 2024
77a748b
cleanup TODOs
iosdev747 Oct 5, 2024
71ff939
add NFR simulations and RL metrics
iosdev747 Oct 12, 2024
2e5ac88
Merge remote-tracking branch 'upstream/master' into rate_limit
iosdev747 Oct 25, 2024
e800581
add RL simulation test and refactor
iosdev747 Oct 25, 2024
ab7dd60
normalize error and decouple get SF call
iosdev747 Oct 29, 2024
08e2aa6
cleanup
iosdev747 Oct 29, 2024
b00d515
cleanup
iosdev747 Oct 29, 2024
752ca74
Refactor RL interfaces, add testcases and cleanup
iosdev747 Nov 6, 2024
4cdc0ba
merge upstream/master
iosdev747 Nov 6, 2024
10b0aa3
revert test changes
iosdev747 Nov 6, 2024
df60a01
rename package ratelimiter
iosdev747 Nov 6, 2024
d4f6df9
minor refactor
iosdev747 Nov 11, 2024
9746a5b
rename RL module
iosdev747 Nov 11, 2024
399ec5a
move classes to records
iosdev747 Nov 11, 2024
00a9dd3
merging master
gaurav-ashok Nov 12, 2024
bcf595c
remove server dependency from controller
iosdev747 Nov 13, 2024
e857f8c
Merge remote-tracking branch 'origin/rate_limit' into rate_limit
iosdev747 Nov 13, 2024
a1c5ea0
add tests and decouple metrics collection
iosdev747 Nov 20, 2024
bd30f66
update RL unit tests
iosdev747 Nov 20, 2024
db4a394
merge upstream/master
iosdev747 Nov 20, 2024
f92a292
fix tests
iosdev747 Nov 20, 2024
3668174
test debug statements
iosdev747 Nov 20, 2024
4202e4e
Revert "test debug statements"
iosdev747 Nov 20, 2024
68762e0
fix flaky test
iosdev747 Nov 20, 2024
03c11fc
remove simulations from test exec
iosdev747 Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .run/Dev, start pulsar and zk.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<option name="envFilePath" value="" />
<option name="profiles">
<list>
<option value="dev" />
<option value="dev-metric" />
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved
</list>
</option>
<option name="removeOrphansOnComposeDown" value="false" />
Expand Down
16 changes: 16 additions & 0 deletions rateLimiter/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
plugins {
id 'com.flipkart.varadhi.java-library-conventions'
// id 'java-test-fixtures'
}

dependencies {
// api(project(':common'))
// api(project(':spi'))
// implementation("io.vertx:vertx-core:${vertx_version}")
implementation('com.fasterxml.jackson.core:jackson-databind')
implementation("org.apache.commons:commons-lang3")
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved
// implementation("com.google.guava:guava")

// testFixturesImplementation(project(":common"))
// testImplementation(project(':pulsar'))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.flipkart.varadhi.qos;

import com.flipkart.varadhi.qos.entity.RateLimiterType;

public interface RateLimiter {
RateLimiterType getType();
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved

// assumes 1 QPS
// for batch request (to be considered as multi QPS), consider creating a new API
boolean isAllowed(long value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.flipkart.varadhi.qos;

import com.flipkart.varadhi.qos.entity.RateLimiterType;

import java.util.concurrent.atomic.LongAdder;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TopicRateLimiter implements RateLimiter {

@Getter
private final String topic;
@Getter
private final RateLimiterType type;
@Getter
private volatile double suppressionFactor;
private long lastObserved;
private final LongAdder currentObserved;

public TopicRateLimiter(String topic, RateLimiterType type) {
this.topic = topic;
this.type = type;
this.suppressionFactor = 0;
this.lastObserved = 0L;
this.currentObserved = new LongAdder();
}

@Override
public boolean isAllowed(long value) {
this.currentObserved.add(value);
if (this.suppressionFactor == 0) {
return true;
}
return this.currentObserved.longValue() <= this.lastObserved * (1 - this.suppressionFactor);
}

public void setSuppressionFactor(double suppressionFactor) {
this.lastObserved = this.currentObserved.longValue();
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved
// remove last recorded value
this.currentObserved.add(-lastObserved);
this.suppressionFactor = suppressionFactor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.flipkart.varadhi.qos.entity;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class ClientHistory<T> {
Map<String, Deque<T>> clientHistoryMap; // clientId to history records
private final int slots;

public ClientHistory(int historySlots) {
this.clientHistoryMap = new ConcurrentHashMap<>();
this.slots = historySlots;
}

public T getRecentRecord(String clientId) {
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved
var load = clientHistoryMap.get(clientId);
if(ObjectUtils.isEmpty(load)) {
return null;
}
return load.peekLast();
}

public void add(String clientId, T load) {
// first time for client, create a new history queue
if(!clientHistoryMap.containsKey(clientId)) {
addClient(clientId);
// // other option to throw exception and handle client addition somewhere else
// throw new IllegalArgumentException("Client not found");
}

// if history queue is full, remove the oldest data
if(clientHistoryMap.get(clientId).size() == slots)
clientHistoryMap.get(clientId).poll();

// add new data
clientHistoryMap.get(clientId).add(load);
}

public void addClient(String clientId) {
clientHistoryMap.put(clientId, new ArrayDeque<>(slots));
}

// TODO(rl): remove not active clients
public void removeClient(String clientId) {
clientHistoryMap.remove(clientId);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.flipkart.varadhi.qos.entity;


import java.util.List;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
@AllArgsConstructor
public class ClientLoadInfo {
// current millis
private String clientId;
private long from;
private long to;
private List<TrafficData> topicUsageList; // topic to incoming traffic map
Copy link
Collaborator

@gauravAshok gauravAshok Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are sending topic names every second. What are the throughout requirements from each producer to controller?


@Override
public String toString() {
return "\nLoadInfo{" +
"clientId='" + clientId + '\'' +
", from=" + from +
", to=" + to +
", topicUsageMap=" + topicUsageList +
"}\n";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.flipkart.varadhi.qos.entity;

public enum RateLimiterType {
THROUGHPUT_CHECK,
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved
QPS_CHECK
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.flipkart.varadhi.qos.entity;

import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

/**
* Suppression data for rate limiting. Stores suppression factor for each topic.
*/
@Getter
@Setter
public class SuppressionData implements Serializable {
iosdev747 marked this conversation as resolved.
Show resolved Hide resolved
private Map<String, SuppressionFactor> suppressionFactor = new HashMap<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here. using topic names to send data from controller to each producer.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.flipkart.varadhi.qos.entity;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
* Suppression factor for a topic. Factor must be between 0 and 1. 0 means no suppression, 1 means full suppression.
*/
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class SuppressionFactor {
private double throughputFactor;
// can be extended to support suppression factor for QPS, custom measurements etc.
// double qpsFactor;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.flipkart.varadhi.qos.entity;


import java.util.List;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
@AllArgsConstructor
public class TopicLoadInfo {
// current millis
private String clientId; // todo(rl): maybe optional?
private long from;
private long to;
private TrafficData topicLoad; // topic to incoming traffic map

@Override
public String toString() {
return "\nTopicLoadInfo{" +
"clientId='" + clientId + '\'' +
", from=" + from +
", to=" + to +
", topicLoad=" + topicLoad +
"}\n";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.flipkart.varadhi.qos.entity;

import lombok.Builder;
import lombok.Getter;
import lombok.Setter;

/**
* Single topic's traffic data
*/
@Getter
@Setter
@Builder
// TODO(rl): not in RL module, move to entity module
public class TrafficData {
private String topic;
private long bytesIn;
private long rateIn;

@Override
public String toString() {
return "\nTrafficData{" +
"throughputIn=" + bytesIn +
", rateIn=" + rateIn +
"}";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.flipkart.varadhi.qos.weights;

public class ConstantWeightFunction implements WeightFunction {
@Override
public float applyWeight(long time, long currentTime, long windowSize) {
if(time < (currentTime - windowSize)) {
return 0.0f;
}
return 1.0f;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.flipkart.varadhi.qos.weights;

public class ExponentialWeightFunction implements WeightFunction {
private final float k;

public ExponentialWeightFunction(float k) {
this.k = k;
}

@Override
public float applyWeight(long time, long currentTime, long windowSize) {
if(time < (currentTime - windowSize)) {
return 0.0f;
}
return 1.0f - (float) Math.exp(-k * (time - (currentTime - windowSize)) / windowSize);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.flipkart.varadhi.qos.weights;

public class LinearWeightFunction implements WeightFunction {
@Override
public float applyWeight(long time, long currentTime, long windowSize) {
if(time < (currentTime - windowSize)) {
return 0.0f;
}
return (float) (time - (currentTime - windowSize)) / windowSize;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.flipkart.varadhi.qos.weights;

public interface WeightFunction {
float applyWeight(long time, long currentTime, long windowSize);
}
14 changes: 14 additions & 0 deletions rateLimiter/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module com.flipkart.varadhi.rateLimiter {
requires static lombok;
requires org.slf4j;
// requires com.google.common;
requires org.apache.commons.lang3;
requires jakarta.validation;
requires com.fasterxml.jackson.databind;

// requires com.flipkart.varadhi.common;
// requires com.flipkart.varadhi.spi;
// requires com.flipkart.varadhi.entities;

exports com.flipkart.varadhi.qos.entity;
}
1 change: 1 addition & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies {
implementation(project(":authz"))
implementation(project(":controller"))
implementation(project(":consumer"))
implementation(project(':rateLimiter'))

implementation("org.apache.logging.log4j:log4j-slf4j2-impl")
implementation("org.apache.logging.log4j:log4j-core")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.flipkart.varadhi.cluster.messages.ClusterMessage;
import com.flipkart.varadhi.cluster.messages.ResponseMessage;
import com.flipkart.varadhi.exceptions.NotImplementedException;
import com.flipkart.varadhi.utils.JsonMapper;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
Expand Down Expand Up @@ -48,7 +47,13 @@ public CompletableFuture<Void> send(String routeName, String apiName, ClusterMes
}

public void publish(String routeName, String apiName, ClusterMessage msg) {
throw new NotImplementedException("publish not implemented");
String apiPath = getPath(routeName, apiName, RouteMethod.PUBLISH);
try {
vertxEventBus.publish(apiPath, JsonMapper.jsonSerialize(msg), deliveryOptions);
log.debug("publish({}, {}) delivered.", apiPath, msg.getId());
} catch (Exception e) {
log.error("publish({}, {}) Unexpected failure:{}", apiPath, msg.getId(), e.getMessage());
}
}

public CompletableFuture<ResponseMessage> request(String routeName, String apiName, ClusterMessage msg) {
Expand All @@ -57,7 +62,7 @@ public CompletableFuture<ResponseMessage> request(String routeName, String apiNa
try {
vertxEventBus.request(apiPath, JsonMapper.jsonSerialize(msg), deliveryOptions, ar -> {
if (ar.succeeded()) {
log.debug("request({}, {}) delivered. {}.", apiPath, msg.getId(), ar.result().body());
log.info("request({}, {}) delivered. {}.", apiPath, msg.getId(), ar.result().body());
ResponseMessage response =
JsonMapper.jsonDeserialize((String) ar.result().body(), ResponseMessage.class);
if (response.getException() != null) {
Expand Down
Loading
Loading