diff --git a/.run/Start pulsar and zk (dev mode).run.xml b/.run/Start pulsar and zk (dev mode).run.xml
index 3607e4b9..4037fc67 100644
--- a/.run/Start pulsar and zk (dev mode).run.xml
+++ b/.run/Start pulsar and zk (dev mode).run.xml
@@ -14,4 +14,4 @@
-
\ No newline at end of file
+
diff --git a/common/src/main/java/com/flipkart/varadhi/utils/FutureUtil.java b/common/src/main/java/com/flipkart/varadhi/utils/FutureUtil.java
new file mode 100644
index 00000000..d4868874
--- /dev/null
+++ b/common/src/main/java/com/flipkart/varadhi/utils/FutureUtil.java
@@ -0,0 +1,35 @@
+package com.flipkart.varadhi.utils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+
+/**
+ * This class is aimed at simplifying work with {@code CompletableFuture}.
+ */
+public class FutureUtil {
+
+ /**
+ * Return a future that represents the completion of the futures in the provided Collection.
+ *
+ * @param futures futures to wait for
+ * @return a new CompletableFuture that is completed when all the given CompletableFutures complete
+ */
+ public static CompletableFuture waitForAll(Collection extends CompletableFuture>> futures) {
+ if (futures == null || futures.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+ }
+
+ public static CompletableFuture> waitForAll(Stream>> futures) {
+ return futures.reduce(CompletableFuture.completedFuture(new ArrayList<>()),
+ (pre, curr) -> pre.thenCompose(preV -> curr.thenApply(currV -> {
+ preV.addAll(currV);
+ return preV;
+ })));
+ }
+
+}
diff --git a/controller/build.gradle b/controller/build.gradle
index bb369d83..5b3c714c 100644
--- a/controller/build.gradle
+++ b/controller/build.gradle
@@ -6,7 +6,9 @@ dependencies {
api(project(":entities"))
api(project(":spi"))
api(project(":core"))
+ api(project(':rate-limiter'))
+ implementation(project(":common"))
implementation("io.vertx:vertx-micrometer-metrics")
implementation('com.fasterxml.jackson.core:jackson-databind')
implementation("org.apache.commons:commons-lang3")
diff --git a/controller/src/main/java/com/flipkart/varadhi/controller/DistributedRateLimiterImpl.java b/controller/src/main/java/com/flipkart/varadhi/controller/DistributedRateLimiterImpl.java
new file mode 100644
index 00000000..99be3fb0
--- /dev/null
+++ b/controller/src/main/java/com/flipkart/varadhi/controller/DistributedRateLimiterImpl.java
@@ -0,0 +1,81 @@
+package com.flipkart.varadhi.controller;
+
+import com.flipkart.varadhi.core.capacity.TopicCapacityService;
+import com.flipkart.varadhi.qos.DistributedRateLimiter;
+import com.flipkart.varadhi.qos.entity.ClientHistory;
+import com.flipkart.varadhi.qos.entity.ClientLoadInfo;
+import com.flipkart.varadhi.qos.entity.SuppressionData;
+import com.flipkart.varadhi.qos.entity.SuppressionFactor;
+import com.flipkart.varadhi.qos.entity.TopicLoadInfo;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.mutable.MutableDouble;
+
+import java.time.Clock;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+public class DistributedRateLimiterImpl implements DistributedRateLimiter {
+ private Map topicTrafficDataMap; // topic to client load info
+ private final int windowSlots;
+ private final int singleSlotMillis;
+ private final TopicCapacityService topicCapacityService;
+ private final Clock clock;
+
+ public DistributedRateLimiterImpl(int windowSlots, int singleSlotMillis, TopicCapacityService topicCapacityService, Clock clock) {
+ this.topicTrafficDataMap = new ConcurrentHashMap<>();
+ this.windowSlots = windowSlots;
+ this.singleSlotMillis = singleSlotMillis;
+ this.topicCapacityService = topicCapacityService;
+ this.clock = clock;
+ }
+
+ // Adds throughput for current client and returns the updated suppression factor for the topic
+ // TODO(rl): add NFR tests
+ private SuppressionFactor addTrafficData(String clientId, TopicLoadInfo topicLoadInfo) {
+ // check if clientId is already present in the list
+ String topic = topicLoadInfo.topicLoad().topic();
+ MutableDouble actualThroughout = new MutableDouble(0.0);
+ topicTrafficDataMap.compute(topic, (k, v) -> {
+ if (v == null) {
+ v = new ClientHistory(windowSlots, singleSlotMillis, clock);
+ }
+ v.add(clientId, topicLoadInfo);
+ actualThroughout.setValue(getThroughput(v));
+ return v;
+ });
+ int throughputBPS = topicCapacityService.getThroughputLimit(topic);
+ return new SuppressionFactor(calculateSuppressionFactor(throughputBPS, actualThroughout.getValue()));
+ }
+
+ // TODO(rl): remove client from here
+
+ private Double calculateSuppressionFactor(double limit, double actual) {
+ return Math.max(0, 1.0 - (limit / actual));
+ }
+
+ private Double getThroughput(ClientHistory clientsHistory) {
+ double totalThroughput = 0.0;
+ List records = clientsHistory.predictLoad();
+ for(TopicLoadInfo record: records){
+ double windowSizeInSeconds = (double) (record.to() - record.from()) / 1000;
+ totalThroughput += record.topicLoad().bytesIn() / windowSizeInSeconds;
+ }
+ return totalThroughput;
+ }
+
+ @Override
+ public SuppressionData addTrafficData(ClientLoadInfo info) {
+ SuppressionData suppressionData = new SuppressionData();
+ info.getTopicUsageList().forEach((trafficData) -> {
+ SuppressionFactor suppressionFactor = addTrafficData(
+ info.getClientId(),
+ new TopicLoadInfo(info.getClientId(), info.getFrom(), info.getTo(), trafficData)
+ );
+ log.debug("Topic: {}, SF thr-pt: {}", trafficData.topic(), suppressionFactor.getThroughputFactor());
+ suppressionData.getSuppressionFactor().put(trafficData.topic(), suppressionFactor);
+ });
+ return suppressionData;
+ }
+}
diff --git a/core/src/main/java/com/flipkart/varadhi/core/capacity/TopicCapacityService.java b/core/src/main/java/com/flipkart/varadhi/core/capacity/TopicCapacityService.java
new file mode 100644
index 00000000..837294c8
--- /dev/null
+++ b/core/src/main/java/com/flipkart/varadhi/core/capacity/TopicCapacityService.java
@@ -0,0 +1,10 @@
+package com.flipkart.varadhi.core.capacity;
+
+public interface TopicCapacityService {
+ /**
+ * Get the throughput for a topic
+ * @param topic varadhi topic name
+ * @return throughput in bytes per second
+ */
+ int getThroughputLimit(String topic);
+}
diff --git a/rate-limiter/build.gradle b/rate-limiter/build.gradle
new file mode 100644
index 00000000..57712a86
--- /dev/null
+++ b/rate-limiter/build.gradle
@@ -0,0 +1,7 @@
+plugins {
+ id 'com.flipkart.varadhi.java-library-conventions'
+}
+
+dependencies {
+ implementation("io.micrometer:micrometer-core")
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/DistributedRateLimiter.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/DistributedRateLimiter.java
new file mode 100644
index 00000000..74d01e5b
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/DistributedRateLimiter.java
@@ -0,0 +1,12 @@
+package com.flipkart.varadhi.qos;
+
+import com.flipkart.varadhi.qos.entity.ClientLoadInfo;
+import com.flipkart.varadhi.qos.entity.SuppressionData;
+
+/**
+ * DistributedRateLimiter interface that takes in loadInfo from all the clients and returns the SuppressionData for each
+ * topic so that FactorRateLimiter can use it to limit the traffic.
+ */
+public interface DistributedRateLimiter extends RateLimiter {
+
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/FactorRateLimiter.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/FactorRateLimiter.java
new file mode 100644
index 00000000..836451a2
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/FactorRateLimiter.java
@@ -0,0 +1,5 @@
+package com.flipkart.varadhi.qos;
+
+public interface FactorRateLimiter extends RateLimiter {
+ void updateSuppressionFactor(double suppressionFactor);
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/ProbabilisticTopicRateLimiter.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/ProbabilisticTopicRateLimiter.java
new file mode 100644
index 00000000..c14efbfd
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/ProbabilisticTopicRateLimiter.java
@@ -0,0 +1,31 @@
+package com.flipkart.varadhi.qos;
+
+import com.flipkart.varadhi.qos.entity.RateLimiterType;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+@Getter
+@Slf4j
+public class ProbabilisticTopicRateLimiter implements FactorRateLimiter {
+
+ private final String topic;
+ private final RateLimiterType type;
+ private volatile double suppressionFactor;
+
+ public ProbabilisticTopicRateLimiter(String topic, RateLimiterType type) {
+ this.topic = topic;
+ this.type = type;
+ this.suppressionFactor = 0;
+ }
+
+ @Override
+ public Boolean addTrafficData(Long value) {
+ // generate random number between 0 and 1
+ return Math.random() > suppressionFactor;
+ }
+
+ @Override
+ public void updateSuppressionFactor(double suppressionFactor) {
+ this.suppressionFactor = suppressionFactor;
+ }
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/RateLimiter.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/RateLimiter.java
new file mode 100644
index 00000000..c373cad4
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/RateLimiter.java
@@ -0,0 +1,5 @@
+package com.flipkart.varadhi.qos;
+
+public interface RateLimiter {
+ Result addTrafficData(Param load);
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/RateLimiterMetrics.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/RateLimiterMetrics.java
new file mode 100644
index 00000000..93a91f68
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/RateLimiterMetrics.java
@@ -0,0 +1,51 @@
+package com.flipkart.varadhi.qos;
+
+import io.micrometer.core.instrument.MeterRegistry;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RateLimiterMetrics {
+ private final MeterRegistry meterRegistry;
+ private final String clientId;
+ private final Map topicMetricsHandlers;
+ private final String acceptedThrptCounterName = "varadhi.producer.qos.allowedBytes";
+ private final String rejectedThrptCounterName = "varadhi.producer.qos.rateLimitedBytes";
+ private final String acceptedQPSCounterName = "varadhi.producer.qos.allowedQueries";
+ private final String rejectedQPSCounterName = "varadhi.producer.qos.rateLimitedQueries";
+ private final String suppressionFactorGaugeName = "varadhi.producer.qos.suppressionFactor";
+
+ public RateLimiterMetrics(MeterRegistry meterRegistry, String clientId) {
+ this.meterRegistry = meterRegistry;
+ this.clientId = clientId;
+ this.topicMetricsHandlers = new ConcurrentHashMap<>();
+ }
+
+
+ public void addSuccessRequest(String topic, long dataSize) {
+ TopicMetricsHandler metricsHandler = getTopicMetricsHandler(topic);
+ metricsHandler.getCounter(acceptedThrptCounterName).increment(dataSize);
+ metricsHandler.getCounter(acceptedQPSCounterName).increment();
+ }
+
+ public void addRateLimitedRequest(String topic, long dataSize) {
+ TopicMetricsHandler metricsHandler = getTopicMetricsHandler(topic);
+ metricsHandler.getCounter(rejectedThrptCounterName).increment(dataSize);
+ metricsHandler.getCounter(rejectedQPSCounterName).increment();
+ }
+
+ private TopicMetricsHandler getTopicMetricsHandler(String topic) {
+ return topicMetricsHandlers.compute(topic, (k, v) -> {
+ if (v == null) {
+ v = new TopicMetricsHandler(meterRegistry, clientId, topic);
+ }
+ return v;
+ });
+ }
+
+ public void registerSuppressionFactorGauge(String topic, TopicRateLimiter topicRateLimiter) {
+ TopicMetricsHandler metricsHandler = getTopicMetricsHandler(topic);
+ metricsHandler.getTopicRateLimiterGauge(suppressionFactorGaugeName, topicRateLimiter);
+ }
+
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/RateLimiterService.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/RateLimiterService.java
new file mode 100644
index 00000000..7cac46f6
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/RateLimiterService.java
@@ -0,0 +1,85 @@
+package com.flipkart.varadhi.qos;
+
+import com.flipkart.varadhi.qos.entity.RateLimiterType;
+import lombok.extern.slf4j.Slf4j;
+
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+
+@Slf4j
+public class RateLimiterService {// todo(rl): should be an interface for update sf
+
+ /**
+ * Map of topic to rate limiters (different rate limiters for QPS, Throughput etc)
+ */
+ private final Map> topicRateLimiters;
+ private final TrafficAggregator trafficAggregator;
+ private final RateLimiterMetrics rateLimiterMetrics;
+
+ public RateLimiterService(
+ DistributedRateLimiter distributedRateLimiter, RateLimiterMetrics rateLimiterMetrics, int frequency,
+ String clientId
+ )
+ throws UnknownHostException {
+ topicRateLimiters = new HashMap<>();
+ trafficAggregator = new TrafficAggregator(
+ clientId,
+ frequency,
+ distributedRateLimiter,
+ this,
+ Executors.newScheduledThreadPool(1)
+ );
+ this.rateLimiterMetrics = rateLimiterMetrics;
+ }
+
+ private List getRateLimiter(String topic) {
+ if (!topicRateLimiters.containsKey(topic)) {
+ List rateLimiters = List.of(new TopicRateLimiter(topic, RateLimiterType.THROUGHPUT));
+ topicRateLimiters.put(
+ topic,
+ rateLimiters
+ );
+ // register all the topic rate limiter to observe rate limit factors
+ rateLimiters.forEach(rl -> {
+ if (rl instanceof TopicRateLimiter trl) {
+ registerGauges(topic, trl);
+ }
+ });
+ }
+ return topicRateLimiters.get(topic);
+ }
+
+ public void updateSuppressionFactor(String topic, Double suppressionFactor) {
+ log.debug("Updating suppression factor for topic: {}", topic);
+ getRateLimiter(topic).forEach(rl -> {
+ if (rl instanceof TopicRateLimiter trl) {
+ if (trl.getTopic().equals(topic)) {
+ log.debug("Setting SF for topic: {}, factor: {}, rl: {}", topic, suppressionFactor, trl.getType());
+ rl.updateSuppressionFactor(suppressionFactor);
+ }
+ }
+ });
+ }
+
+ public boolean isAllowed(String topic, long dataSize) {
+ trafficAggregator.addTopicUsage(topic, dataSize);
+ // get all the rate limiters for given topic and check if all of them allow the request
+ return getRateLimiter(topic).stream().allMatch(rl -> {
+ boolean allowed = rl.addTrafficData(dataSize);
+ if (allowed) {
+ rateLimiterMetrics.addSuccessRequest(topic, dataSize);
+ } else {
+ rateLimiterMetrics.addRateLimitedRequest(topic, dataSize);
+ }
+ return allowed;
+ });
+ }
+
+ private void registerGauges(String topic, TopicRateLimiter topicRateLimiter) {
+ rateLimiterMetrics.registerSuppressionFactorGauge(topic, topicRateLimiter);
+ }
+
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/TopicMetricsHandler.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/TopicMetricsHandler.java
new file mode 100644
index 00000000..8db28dda
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/TopicMetricsHandler.java
@@ -0,0 +1,49 @@
+package com.flipkart.varadhi.qos;
+
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TopicMetricsHandler {
+ private final MeterRegistry meterRegistry;
+ private final String clientId;
+ private final String topic;
+ private final Map counterCache;
+ private final Map gaugeCache;
+
+ public TopicMetricsHandler(MeterRegistry meterRegistry, String clientId, String topic) {
+ this.meterRegistry = meterRegistry;
+ this.clientId = clientId;
+ this.topic = topic;
+ this.counterCache = new ConcurrentHashMap<>();
+ this.gaugeCache = new ConcurrentHashMap<>();
+ }
+
+ public Counter getCounter(String metricName) {
+ return counterCache.compute(metricName, (k, v) -> {
+ if (v == null) {
+ v = Counter.builder(metricName)
+ .tag("topic", topic)
+ .tag("client", clientId)
+ .register(meterRegistry);
+ }
+ return v;
+ });
+ }
+
+ public Gauge getTopicRateLimiterGauge(String metricName, TopicRateLimiter topicRateLimiter) {
+ return gaugeCache.compute(metricName, (k, v) -> {
+ if (v == null) {
+ v = Gauge.builder(metricName, topicRateLimiter, TopicRateLimiter::getSuppressionFactor)
+ .tag("topic", topic)
+ .tag("client", clientId)
+ .register(meterRegistry);
+ }
+ return v;
+ });
+ }
+
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/TopicRateLimiter.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/TopicRateLimiter.java
new file mode 100644
index 00000000..0372c771
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/TopicRateLimiter.java
@@ -0,0 +1,47 @@
+package com.flipkart.varadhi.qos;
+
+import com.flipkart.varadhi.qos.entity.RateLimiterType;
+
+import java.util.concurrent.atomic.LongAdder;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class TopicRateLimiter implements FactorRateLimiter {
+
+ @Getter
+ private final String topic;
+ @Getter
+ private final RateLimiterType type;
+ @Getter
+ private volatile double suppressionFactor;
+ private long lastObserved;
+ private final LongAdder currentObserved;
+
+ public TopicRateLimiter(String topic, RateLimiterType type) {
+ this.topic = topic;
+ this.type = type;
+ this.suppressionFactor = 0;
+ this.lastObserved = 0L;
+ this.currentObserved = new LongAdder();
+ }
+
+ @Override
+ public Boolean addTrafficData(Long value) {
+ currentObserved.add(value);
+ // todo(rl): allows spikes, need to consider a better way to handle spikes
+ if (suppressionFactor == 0 || lastObserved == 0L) {
+ return true;
+ }
+ return currentObserved.longValue() <= lastObserved * (1 - suppressionFactor);
+ }
+
+ @Override
+ public void updateSuppressionFactor(double suppressionFactor) {
+ lastObserved = currentObserved.longValue();
+ // remove last recorded value
+ currentObserved.add(-lastObserved);
+ this.suppressionFactor = suppressionFactor;
+ }
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/TrafficAggregator.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/TrafficAggregator.java
new file mode 100644
index 00000000..d80a62ce
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/TrafficAggregator.java
@@ -0,0 +1,115 @@
+package com.flipkart.varadhi.qos;
+
+import com.flipkart.varadhi.qos.entity.ClientLoadInfo;
+import com.flipkart.varadhi.qos.entity.SuppressionData;
+import com.flipkart.varadhi.qos.entity.TrafficData;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * This class will capture incoming traffic usage by providing a method to add topic usage by producers.
+ * This class will accumualte everything and send to the controller for further processing for rate limit.
+ * Post that, reset the usage for the topic.
+ */
+@Slf4j
+public class TrafficAggregator {
+
+ private final ClientLoadInfo loadInfo;
+ private final int frequency;
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final DistributedRateLimiter distributedRateLimiter;
+ private final RateLimiterService rateLimiterService;
+ private final Map topicTrafficMap;
+
+ // Inner class to aggregate topic data before sending to controller
+ static class ConcurrentTopicData {
+ private final String topic;
+ private final LongAdder bytesIn;
+ private final LongAdder rateIn;
+
+ public ConcurrentTopicData(String topic) {
+ this.topic = topic;
+ this.bytesIn = new LongAdder();
+ this.rateIn = new LongAdder();
+ }
+ }
+
+ public TrafficAggregator(
+ String clientId, int frequency, DistributedRateLimiter distributedRateLimiter, RateLimiterService rateLimiterService,
+ ScheduledExecutorService scheduledExecutorService
+ ) {
+ this.frequency = frequency;
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.distributedRateLimiter = distributedRateLimiter;
+ this.rateLimiterService = rateLimiterService;
+ this.loadInfo = new ClientLoadInfo(clientId, 0,0, new ArrayList<>());
+ this.topicTrafficMap = new ConcurrentHashMap<>();
+ sendUsageToController();
+ }
+
+ public void addTopicUsage(String topic, long dataSize, long queries) {
+ topicTrafficMap.compute(topic, (k, v) -> {
+ if (v == null) {
+ v = new ConcurrentTopicData(topic);
+ }
+ v.rateIn.add(queries);
+ v.bytesIn.add(dataSize);
+ return v;
+ });
+ }
+
+ // Overloaded method to add topic usage for single request
+ public void addTopicUsage(String topic, long dataSize) {
+ addTopicUsage(topic, dataSize, 1);
+ }
+
+ private void sendUsageToController() {
+ // todo(rl): explore scheduleWithFixedDelay
+ scheduledExecutorService.scheduleAtFixedRate(() -> {
+ try {
+ sendTrafficUsageAndUpdateSuppressionFactor();
+ } catch (Exception e) {
+ log.error("Error while sending usage to controller", e);
+ }
+ }, frequency, frequency, TimeUnit.SECONDS);
+
+ }
+
+ private void sendTrafficUsageAndUpdateSuppressionFactor() {
+ long currentTime = System.currentTimeMillis();
+ loadInfo.setTo(currentTime);
+ // convert ConcurrentTopicData to TrafficData.list
+ topicTrafficMap.forEach((topic, data) -> loadInfo.getTopicUsageList()
+ .add(new TrafficData(topic, data.bytesIn.sum(), data.rateIn.sum())));
+ log.debug("Sending traffic data to controller: {}", loadInfo);
+ // TODO(rl); simulate add delay for degradation;
+ SuppressionData suppressionData = distributedRateLimiter.addTrafficData(loadInfo);
+ applySuppressionFactors(suppressionData);
+ resetData(currentTime);
+ }
+
+ private void applySuppressionFactors(SuppressionData suppressionData) {
+ suppressionData.getSuppressionFactor().forEach(
+ (topic, suppressionFactor) -> rateLimiterService.updateSuppressionFactor(topic,
+ suppressionFactor.getThroughputFactor()
+ ));
+ }
+
+ private void resetData(long time) {
+ loadInfo.setFrom(time);
+ // remove snapshot from current aggregated data
+ loadInfo.getTopicUsageList().forEach(trafficData -> {
+ topicTrafficMap.get(trafficData.topic()).bytesIn.add(-trafficData.bytesIn());
+ topicTrafficMap.get(trafficData.topic()).rateIn.add(-trafficData.rateIn());
+ });
+ // reset snapshot
+ loadInfo.getTopicUsageList().clear();
+ }
+
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/ClientHistory.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/ClientHistory.java
new file mode 100644
index 00000000..75a0cd81
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/ClientHistory.java
@@ -0,0 +1,81 @@
+package com.flipkart.varadhi.qos.entity;
+
+import java.time.Clock;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+public class ClientHistory implements LoadPredictor {
+ private final Map> clientHistoryMap; // clientId to history records
+ private final int slots;
+ private final int slotDuration;
+ private final Clock clock;
+
+ public ClientHistory(int historySlots, int slotDuration, Clock clock) {
+ if(historySlots <= 0 || slotDuration <= 0) {
+ throw new IllegalArgumentException("Invalid history slots or slot duration");
+ }
+ this.clientHistoryMap = new ConcurrentHashMap<>();
+ this.slots = historySlots;
+ this.slotDuration = slotDuration;
+ this.clock = clock;
+ }
+
+ @Override
+ public List predictLoad() {
+ List recentRecords = new ArrayList<>();
+ clientHistoryMap.forEach((clientId, history) -> {
+ if(!history.isEmpty()) {
+ TopicLoadInfo topicLoadInfo = history.peekLast();
+ if(topicLoadInfo != null) {
+ if(!isExpired(topicLoadInfo.to())) {
+ recentRecords.add(topicLoadInfo);
+ } else {
+ // most recent record of given client is outside of window
+ removeClient(clientId);
+ }
+ }
+ }
+ });
+
+ // returns the best possible prediction of load
+ return recentRecords;
+ }
+
+ @Override
+ public void add(String clientId, TopicLoadInfo load) {
+ // first time for client, create a new history queue
+ clientHistoryMap.compute(clientId, (k, v) -> {
+ // add a new client
+ if (v == null) {
+ v = new ArrayDeque<>(slots);
+ }
+ if(v.size() == slots) {
+ v.poll();
+ }
+ v.add(load);
+ return v;
+ });
+ }
+
+ private void removeClient(String clientId) {
+ clientHistoryMap.remove(clientId);
+ }
+
+ /**
+ * check if record is expired (older than number of slots maintained)
+ * @param time record time
+ * @return true if record is older than windowSize
+ */
+ private boolean isExpired(long time) {
+ return (clock.millis() - (long) slots * slotDuration) > time;
+ }
+
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/ClientLoadInfo.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/ClientLoadInfo.java
new file mode 100644
index 00000000..864a6205
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/ClientLoadInfo.java
@@ -0,0 +1,30 @@
+package com.flipkart.varadhi.qos.entity;
+
+
+import java.util.List;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+@AllArgsConstructor
+public class ClientLoadInfo {
+ // current millis
+ private String clientId;
+ private long from;
+ private long to;
+ private List topicUsageList;
+
+ @Override
+ public String toString() {
+ return "ClientLoadInfo{" +
+ "clientId='" + clientId + '\'' +
+ ", from=" + from +
+ ", to=" + to +
+ ", topicUsageList=" + topicUsageList +
+ '}';
+ }
+
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/LoadPredictor.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/LoadPredictor.java
new file mode 100644
index 00000000..fc948263
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/LoadPredictor.java
@@ -0,0 +1,25 @@
+package com.flipkart.varadhi.qos.entity;
+
+import java.util.List;
+
+/**
+ * The LoadPredictor interface provides a method to predict the topic load coming from different clients.
+ * Implementations of this interface should provide the logic to predict the load. (Eg based on historical data, most
+ * recent data, etc.)
+ */
+public interface LoadPredictor {
+
+ /**
+ * Add the load information of a topic for a given client;
+ * @param clientId unique clientId
+ * @param load topic load information
+ */
+ void add(String clientId, TopicLoadInfo load);
+
+ /**
+ * Predicts the load of topic from different clients.
+ *
+ * @return a list of TopicLoadInfo objects representing the predicted load of topic from different clients.
+ */
+ List predictLoad();
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/RateLimiterType.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/RateLimiterType.java
new file mode 100644
index 00000000..9a3a4129
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/RateLimiterType.java
@@ -0,0 +1,5 @@
+package com.flipkart.varadhi.qos.entity;
+
+public enum RateLimiterType {
+ THROUGHPUT
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/SuppressionData.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/SuppressionData.java
new file mode 100644
index 00000000..cf8fd2f0
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/SuppressionData.java
@@ -0,0 +1,16 @@
+package com.flipkart.varadhi.qos.entity;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Suppression data for rate limiting. Stores suppression factor for each topic.
+ */
+@Getter
+@Setter
+public class SuppressionData {
+ private Map suppressionFactor = new HashMap<>();
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/SuppressionFactor.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/SuppressionFactor.java
new file mode 100644
index 00000000..6e94ad81
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/SuppressionFactor.java
@@ -0,0 +1,19 @@
+package com.flipkart.varadhi.qos.entity;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+/**
+ * Suppression factor for a topic. Factor must be between 0 and 1. 0 means no suppression, 1 means full suppression.
+ */
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+public class SuppressionFactor {
+ private double throughputFactor;
+ // can be extended to support suppression factor for QPS, custom measurements etc.
+ // double qpsFactor;
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/TopicLoadInfo.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/TopicLoadInfo.java
new file mode 100644
index 00000000..9e62e458
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/TopicLoadInfo.java
@@ -0,0 +1,9 @@
+package com.flipkart.varadhi.qos.entity;
+
+public record TopicLoadInfo(
+ String clientId, // todo(rl): maybe optional?
+ long from,
+ long to,
+ TrafficData topicLoad
+) {
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/TrafficData.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/TrafficData.java
new file mode 100644
index 00000000..b7bd8721
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/entity/TrafficData.java
@@ -0,0 +1,11 @@
+package com.flipkart.varadhi.qos.entity;
+
+/**
+ * Single topic's traffic data
+ */
+public record TrafficData (
+ String topic,
+ long bytesIn,
+ long rateIn
+){
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/weights/ConstantWeightFunction.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/weights/ConstantWeightFunction.java
new file mode 100644
index 00000000..96d5fe42
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/weights/ConstantWeightFunction.java
@@ -0,0 +1,11 @@
+package com.flipkart.varadhi.qos.weights;
+
+public class ConstantWeightFunction implements WeightFunction {
+ @Override
+ public float applyWeight(long time, long currentTime, long windowSize) {
+ if(time < (currentTime - windowSize)) {
+ return 0.0f;
+ }
+ return 1.0f;
+ }
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/weights/ExponentialWeightFunction.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/weights/ExponentialWeightFunction.java
new file mode 100644
index 00000000..774a548f
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/weights/ExponentialWeightFunction.java
@@ -0,0 +1,17 @@
+package com.flipkart.varadhi.qos.weights;
+
+public class ExponentialWeightFunction implements WeightFunction {
+ private final float k;
+
+ public ExponentialWeightFunction(float k) {
+ this.k = k;
+ }
+
+ @Override
+ public float applyWeight(long time, long currentTime, long windowSize) {
+ if(time < (currentTime - windowSize)) {
+ return 0.0f;
+ }
+ return 1.0f - (float) Math.exp(-k * (time - (currentTime - windowSize)) / windowSize);
+ }
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/weights/LinearWeightFunction.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/weights/LinearWeightFunction.java
new file mode 100644
index 00000000..e298faaa
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/weights/LinearWeightFunction.java
@@ -0,0 +1,11 @@
+package com.flipkart.varadhi.qos.weights;
+
+public class LinearWeightFunction implements WeightFunction {
+ @Override
+ public float applyWeight(long time, long currentTime, long windowSize) {
+ if(time < (currentTime - windowSize)) {
+ return 0.0f;
+ }
+ return (float) (time - (currentTime - windowSize)) / windowSize;
+ }
+}
diff --git a/rate-limiter/src/main/java/com/flipkart/varadhi/qos/weights/WeightFunction.java b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/weights/WeightFunction.java
new file mode 100644
index 00000000..bf45bcbb
--- /dev/null
+++ b/rate-limiter/src/main/java/com/flipkart/varadhi/qos/weights/WeightFunction.java
@@ -0,0 +1,5 @@
+package com.flipkart.varadhi.qos.weights;
+
+public interface WeightFunction {
+ float applyWeight(long time, long currentTime, long windowSize);
+}
diff --git a/rate-limiter/src/main/java/module-info.java b/rate-limiter/src/main/java/module-info.java
new file mode 100644
index 00000000..e09f7c0f
--- /dev/null
+++ b/rate-limiter/src/main/java/module-info.java
@@ -0,0 +1,6 @@
+module com.flipkart.varadhi.ratelimiter {
+ requires static lombok;
+ requires org.slf4j;
+ requires micrometer.core;
+ exports com.flipkart.varadhi.qos.entity;
+}
diff --git a/rate-limiter/src/test/java/com/flipkart/varadhi/qos/ProbabilisticTopicRateLimiterTests.java b/rate-limiter/src/test/java/com/flipkart/varadhi/qos/ProbabilisticTopicRateLimiterTests.java
new file mode 100644
index 00000000..65f3fa45
--- /dev/null
+++ b/rate-limiter/src/test/java/com/flipkart/varadhi/qos/ProbabilisticTopicRateLimiterTests.java
@@ -0,0 +1,60 @@
+package com.flipkart.varadhi.qos;
+
+import com.flipkart.varadhi.qos.entity.RateLimiterType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class ProbabilisticTopicRateLimiterTests {
+
+ private ProbabilisticTopicRateLimiter rateLimiter;
+ private final String DEFAULT_TOPIC = "topic1";
+ private final RateLimiterType type = RateLimiterType.THROUGHPUT;
+
+ @BeforeEach
+ public void setUp() {
+ rateLimiter = new ProbabilisticTopicRateLimiter(DEFAULT_TOPIC, type);
+ }
+
+ @Test
+ public void testInitialization() {
+ assertEquals(DEFAULT_TOPIC, rateLimiter.getTopic());
+ assertEquals(type, rateLimiter.getType());
+ assertEquals(0, rateLimiter.getSuppressionFactor());
+ }
+
+ @Test
+ public void testUpdateSuppressionFactor() {
+ rateLimiter.updateSuppressionFactor(0.7);
+ assertEquals(0.7, rateLimiter.getSuppressionFactor());
+ }
+
+ @Test
+ public void testZeroFactor() {
+ rateLimiter.updateSuppressionFactor(0);
+ assertTrue(rateLimiter.addTrafficData(100L));
+ }
+
+ @Test
+ public void testOneFactor() {
+ rateLimiter.updateSuppressionFactor(1);
+ assertFalse(rateLimiter.addTrafficData(100L));
+ }
+
+ @Test
+ public void testIntermediateFactor() {
+ rateLimiter.updateSuppressionFactor(0.5);
+ int trueCount = 0;
+ int falseCount = 0;
+ for (int i = 0; i < 1000; i++) {
+ if (rateLimiter.addTrafficData(100L)) {
+ trueCount++;
+ } else {
+ falseCount++;
+ }
+ }
+ assertTrue(trueCount > 0);
+ assertTrue(falseCount > 0);
+ }
+}
diff --git a/rate-limiter/src/test/java/com/flipkart/varadhi/qos/RateLimiterServiceTests.java b/rate-limiter/src/test/java/com/flipkart/varadhi/qos/RateLimiterServiceTests.java
new file mode 100644
index 00000000..cd31163b
--- /dev/null
+++ b/rate-limiter/src/test/java/com/flipkart/varadhi/qos/RateLimiterServiceTests.java
@@ -0,0 +1,72 @@
+package com.flipkart.varadhi.qos;
+
+import com.flipkart.varadhi.qos.entity.SuppressionData;
+import com.flipkart.varadhi.qos.entity.SuppressionFactor;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.*;
+
+public class RateLimiterServiceTests {
+
+ @Mock
+ private DistributedRateLimiter distributedRateLimiter;
+ @Mock
+ private RateLimiterMetrics rateLimiterMetrics;
+ private RateLimiterService rateLimiterService;
+ private final int frequency = 1;
+ private final String DEFAULT_CLIENT_ID = "client1";
+ private final String DEFAULT_TOPIC = "topic1";
+
+ @BeforeEach
+ public void setUp() throws UnknownHostException {
+ MockitoAnnotations.openMocks(this);
+ doNothing().when(rateLimiterMetrics).addSuccessRequest(anyString(), anyLong());
+ doNothing().when(rateLimiterMetrics).addRateLimitedRequest(anyString(), anyLong());
+ doNothing().when(rateLimiterMetrics).registerSuppressionFactorGauge(anyString(), any());
+ rateLimiterService = new RateLimiterService(distributedRateLimiter, rateLimiterMetrics, frequency, DEFAULT_CLIENT_ID);
+ }
+
+ @Test
+ public void testUpdateSuppressionFactor() {
+ rateLimiterService.updateSuppressionFactor(DEFAULT_TOPIC, 0.0);
+ long dataSize = 1000L;
+ assertTrue(rateLimiterService.isAllowed(DEFAULT_TOPIC, dataSize));
+ assertTrue(rateLimiterService.isAllowed(DEFAULT_TOPIC, dataSize));
+ rateLimiterService.updateSuppressionFactor(DEFAULT_TOPIC, 1.0);
+ assertFalse(rateLimiterService.isAllowed(DEFAULT_TOPIC, dataSize));
+ assertFalse(rateLimiterService.isAllowed(DEFAULT_TOPIC, dataSize));
+ }
+
+ @Test
+ public void testIsAllowed() {
+ long dataSize = 1000L;
+ boolean allowed = rateLimiterService.isAllowed(DEFAULT_TOPIC, dataSize);
+ assertTrue(allowed);
+ rateLimiterService.isAllowed(DEFAULT_TOPIC, dataSize);
+ }
+
+ @SneakyThrows
+ @Test
+ public void testIsNotAllowed() {
+ // creates a spike and wait for next interval to see if next request will get rate limited
+ long dataSize = 10000000L;
+ SuppressionData suppressionData = new SuppressionData();
+ suppressionData.setSuppressionFactor(new HashMap<>(Map.of(DEFAULT_TOPIC, new SuppressionFactor(1))));
+ when(distributedRateLimiter.addTrafficData(any())).thenReturn(suppressionData);
+ rateLimiterService.isAllowed(DEFAULT_TOPIC, dataSize);
+ Thread.sleep(frequency * 1000);
+ boolean allowed = rateLimiterService.isAllowed(DEFAULT_TOPIC, dataSize);
+ assertFalse(allowed);
+ }
+
+}
diff --git a/rate-limiter/src/test/java/com/flipkart/varadhi/qos/TopicRateLimiterTests.java b/rate-limiter/src/test/java/com/flipkart/varadhi/qos/TopicRateLimiterTests.java
new file mode 100644
index 00000000..2590adf9
--- /dev/null
+++ b/rate-limiter/src/test/java/com/flipkart/varadhi/qos/TopicRateLimiterTests.java
@@ -0,0 +1,79 @@
+package com.flipkart.varadhi.qos;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.flipkart.varadhi.qos.entity.RateLimiterType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TopicRateLimiterTests {
+
+ private TopicRateLimiter rateLimiter;
+ private final String topic = "test-topic";
+ private final RateLimiterType type = RateLimiterType.THROUGHPUT;
+
+ @BeforeEach
+ public void setUp() {
+ rateLimiter = new TopicRateLimiter(topic, type);
+ }
+
+ @Test
+ public void testInitialization() {
+ assertEquals(topic, rateLimiter.getTopic());
+ assertEquals(type, rateLimiter.getType());
+ assertEquals(0, rateLimiter.getSuppressionFactor());
+ }
+
+ @Test
+ public void testUpdateSuppressionFactor() {
+ rateLimiter.updateSuppressionFactor(0.7);
+ assertEquals(0.7, rateLimiter.getSuppressionFactor());
+ }
+
+ @Test
+ public void testZeroFactor() {
+ assertTrue(rateLimiter.addTrafficData(100L));
+ rateLimiter.updateSuppressionFactor(0);
+ assertTrue(rateLimiter.addTrafficData(100L));
+ }
+
+ @Test
+ public void testOneFactor() {
+ assertTrue(rateLimiter.addTrafficData(100L));
+ rateLimiter.updateSuppressionFactor(1);
+ assertFalse(rateLimiter.addTrafficData(100L));
+ }
+
+ @Test
+ public void testIntermediateFactor() {
+ rateLimiter.addTrafficData(100L);
+ rateLimiter.updateSuppressionFactor(0.5);
+ rateLimiter.addTrafficData(100L);
+ assertFalse(rateLimiter.addTrafficData(100L));
+ }
+
+ @Test
+ public void testWithNoFactor() {
+ assertTrue(rateLimiter.addTrafficData(100L));
+ assertTrue(rateLimiter.addTrafficData(200L));
+ }
+
+ @Test
+ public void testWithFactor() {
+ assertTrue(rateLimiter.addTrafficData(200L));
+ rateLimiter.updateSuppressionFactor(0.5);
+ assertTrue(rateLimiter.addTrafficData(100L));
+ assertFalse(rateLimiter.addTrafficData(200L));
+ }
+
+ @Test
+ public void testChangingFactor() {
+ rateLimiter.updateSuppressionFactor(0.3);
+ rateLimiter.addTrafficData(100L);
+ assertTrue(rateLimiter.addTrafficData(50L));
+ rateLimiter.updateSuppressionFactor(0.7);
+ assertFalse(rateLimiter.addTrafficData(100L));
+ }
+}
diff --git a/rate-limiter/src/test/java/com/flipkart/varadhi/qos/entity/ClientHistoryTests.java b/rate-limiter/src/test/java/com/flipkart/varadhi/qos/entity/ClientHistoryTests.java
new file mode 100644
index 00000000..d2c2ede4
--- /dev/null
+++ b/rate-limiter/src/test/java/com/flipkart/varadhi/qos/entity/ClientHistoryTests.java
@@ -0,0 +1,92 @@
+package com.flipkart.varadhi.qos.entity;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ClientHistoryTests {
+
+ private ClientHistory clientHistory;
+ private Clock clock;
+ private final int historySlots = 5;
+ private final int slotDuration = 1000; // 1 second
+ private final String DEFAULT_CLIENT_ID = "client1";
+ private final String DEFAULT_TOPIC = "topic1";
+
+ @BeforeEach
+ public void setUp() {
+ clock = Clock.fixed(Instant.now(), Clock.systemDefaultZone().getZone());
+ clientHistory = new ClientHistory(historySlots, slotDuration, clock);
+ }
+
+ @Test
+ public void testInitializationParams() {
+ assertTrue(clientHistory.predictLoad().isEmpty());
+ assertThrows(IllegalArgumentException.class, () -> new ClientHistory(0, 1, clock));
+ assertThrows(IllegalArgumentException.class, () -> new ClientHistory(1, 0, clock));
+ assertDoesNotThrow(() -> new ClientHistory(1, 1, clock));
+ }
+
+ @Test
+ public void testAddAndPredictLoadForSingleClient() {
+ long currentTime = Instant.now(clock).toEpochMilli();
+ TopicLoadInfo loadInfo = new TopicLoadInfo(DEFAULT_CLIENT_ID, currentTime, currentTime + slotDuration,
+ new TrafficData(DEFAULT_TOPIC, 100, 100)
+ );
+
+ clientHistory.add(DEFAULT_CLIENT_ID, loadInfo);
+ List predictedLoad = clientHistory.predictLoad();
+
+ assertEquals(1, predictedLoad.size());
+ assertEquals(loadInfo, predictedLoad.get(0));
+ assertEquals(DEFAULT_CLIENT_ID, predictedLoad.get(0).clientId());
+ assertEquals(currentTime, predictedLoad.get(0).from());
+ assertEquals(currentTime + slotDuration, predictedLoad.get(0).to());
+ assertEquals(DEFAULT_TOPIC, predictedLoad.get(0).topicLoad().topic());
+ assertEquals(100, predictedLoad.get(0).topicLoad().bytesIn());
+ assertEquals(100, predictedLoad.get(0).topicLoad().rateIn());
+ }
+
+ @Test
+ public void testAddMultipleLoads() {
+ String clientId = "client";
+ long currentTime = Instant.now(clock).toEpochMilli();
+ for (int i = 0; i < historySlots; i++) {
+ TopicLoadInfo loadInfo = new TopicLoadInfo(clientId + i, currentTime,
+ currentTime + slotDuration, new TrafficData(DEFAULT_TOPIC, 100, 100)
+ );
+ clientHistory.add(clientId + i, loadInfo);
+ }
+ List predictedLoad = clientHistory.predictLoad();
+ assertEquals(historySlots, predictedLoad.size());
+ }
+
+ @Test
+ public void testExpiredLoad() {
+ // mocking clock explicitly to remove usage of sleep
+ long currentTime = Instant.now(clock).toEpochMilli();
+ clock = mock(Clock.class);
+ clientHistory = new ClientHistory(historySlots, slotDuration, clock);
+
+ when(clock.millis()).thenReturn(currentTime);
+
+ TopicLoadInfo loadInfo = new TopicLoadInfo(DEFAULT_CLIENT_ID, currentTime, currentTime + slotDuration,
+ new TrafficData(DEFAULT_TOPIC, 100, 100)
+ );
+
+ clientHistory.add(DEFAULT_CLIENT_ID, loadInfo);
+ assertEquals(1, clientHistory.predictLoad().size());
+
+ // expired time
+ when(clock.millis()).thenReturn(currentTime + (historySlots + 2) * slotDuration);
+ assertTrue(clientHistory.predictLoad().isEmpty());
+ }
+
+}
diff --git a/server/build.gradle b/server/build.gradle
index b3d9e8ef..a8cb9b49 100644
--- a/server/build.gradle
+++ b/server/build.gradle
@@ -100,3 +100,8 @@ run {
"-Dlog4j.configurationFile=$buildDir/resources/main/log4j2.xml"
]
}
+
+// excluding simulation tests from the test task
+test {
+ exclude "**/com/flipkart/varadhi/qos/RateLimiterServiceTest*"
+}
diff --git a/server/src/main/java/com/flipkart/varadhi/CoreServices.java b/server/src/main/java/com/flipkart/varadhi/CoreServices.java
index 291ca90b..f98858ca 100644
--- a/server/src/main/java/com/flipkart/varadhi/CoreServices.java
+++ b/server/src/main/java/com/flipkart/varadhi/CoreServices.java
@@ -27,7 +27,6 @@
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.semconv.ServiceAttributes;
import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -45,7 +44,7 @@ public class CoreServices {
public CoreServices(AppConfiguration configuration, ConfigFileResolver configResolver) {
this.configResolver = configResolver;
- this.observabilityStack = setupObservabilityStack(configuration);
+ this.observabilityStack = new ObservabilityStack(configuration);
this.messagingStackProvider = setupMessagingStackProvider(configuration.getMessagingStackOptions());
this.metaStoreProvider = setupMetaStoreProvider(configuration.getMetaStoreOptions());
}
@@ -81,43 +80,40 @@ private MessagingStackProvider setupMessagingStackProvider(MessagingStackOptions
return provider;
}
- private ObservabilityStack setupObservabilityStack(AppConfiguration configuration) {
-
- Resource resource = Resource.getDefault()
- .merge(Resource.create(Attributes.of(ServiceAttributes.SERVICE_NAME, "com.flipkart.varadhi")));
-
- // TODO: make tracing togglable and configurable.
- float sampleRatio = 1.0f;
-
- // exporting spans as logs, but can be replaced with otlp exporter.
- SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
- .addSpanProcessor(BatchSpanProcessor.builder(LoggingSpanExporter.create()).build())
- .setResource(resource)
- .setSampler(Sampler.parentBased(Sampler.traceIdRatioBased(sampleRatio)))
- .build();
-
- OpenTelemetry openTelemetry = OpenTelemetrySdk.builder()
- .setTracerProvider(sdkTracerProvider)
- .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
- .buildAndRegisterGlobal();
-
- // TODO: make meter registry config configurable. each registry comes with its own config.
- String meterExporter = "jmx";
- MeterRegistry meterRegistry = switch (meterExporter) {
- case "jmx" -> new JmxMeterRegistry(JmxConfig.DEFAULT, Clock.SYSTEM);
- case "prometheus" -> new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
- case "otlp" -> new OtlpMeterRegistry(configuration.getOtelOptions()::get, Clock.SYSTEM);
- default -> null;
- };
- return new ObservabilityStack(openTelemetry, meterRegistry);
- }
-
-
@Getter
- @AllArgsConstructor
public static class ObservabilityStack {
private final OpenTelemetry openTelemetry;
private final MeterRegistry meterRegistry;
+
+ public ObservabilityStack(AppConfiguration configuration) {
+
+ Resource resource = Resource.getDefault()
+ .merge(Resource.create(Attributes.of(ServiceAttributes.SERVICE_NAME, "com.flipkart.varadhi")));
+
+ // TODO: make tracing togglable and configurable.
+ float sampleRatio = 1.0f;
+
+ // exporting spans as logs, but can be replaced with otlp exporter.
+ SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
+ .addSpanProcessor(BatchSpanProcessor.builder(LoggingSpanExporter.create()).build())
+ .setResource(resource)
+ .setSampler(Sampler.parentBased(Sampler.traceIdRatioBased(sampleRatio)))
+ .build();
+
+ this.openTelemetry = OpenTelemetrySdk.builder()
+ .setTracerProvider(sdkTracerProvider)
+ .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
+ .buildAndRegisterGlobal();
+
+ // TODO: make meter registry config configurable. each registry comes with its own config.
+ String meterExporter = "otlp";
+ this.meterRegistry = switch (meterExporter) {
+ case "jmx" -> new JmxMeterRegistry(JmxConfig.DEFAULT, Clock.SYSTEM);
+ case "prometheus" -> new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
+ case "otlp" -> new OtlpMeterRegistry(configuration.getOtelOptions()::get, Clock.SYSTEM);
+ default -> null;
+ };
+ }
}
diff --git a/server/src/main/java/com/flipkart/varadhi/VaradhiApplication.java b/server/src/main/java/com/flipkart/varadhi/VaradhiApplication.java
index 4e216aac..93a0cd61 100644
--- a/server/src/main/java/com/flipkart/varadhi/VaradhiApplication.java
+++ b/server/src/main/java/com/flipkart/varadhi/VaradhiApplication.java
@@ -184,7 +184,7 @@ private static Map getComponentVerticles(
) {
return Arrays.stream(memberInfo.roles()).distinct()
.collect(Collectors.toMap(Function.identity(), kind -> switch (kind) {
- case Server -> new WebServerVerticle(config, coreServices, clusterManager);
+ case Server -> new WebServerVerticle(config, memberInfo, coreServices, clusterManager);
case Controller -> new ControllerVerticle(config.getController(), coreServices, clusterManager);
case Consumer -> new ConsumerVerticle(memberInfo, clusterManager);
}));
diff --git a/server/src/main/java/com/flipkart/varadhi/cluster/MessageExchange.java b/server/src/main/java/com/flipkart/varadhi/cluster/MessageExchange.java
index f1587722..8c107e6f 100644
--- a/server/src/main/java/com/flipkart/varadhi/cluster/MessageExchange.java
+++ b/server/src/main/java/com/flipkart/varadhi/cluster/MessageExchange.java
@@ -2,7 +2,6 @@
import com.flipkart.varadhi.cluster.messages.ClusterMessage;
import com.flipkart.varadhi.cluster.messages.ResponseMessage;
-import com.flipkart.varadhi.exceptions.NotImplementedException;
import com.flipkart.varadhi.utils.JsonMapper;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
@@ -48,7 +47,13 @@ public CompletableFuture send(String routeName, String apiName, ClusterMes
}
public void publish(String routeName, String apiName, ClusterMessage msg) {
- throw new NotImplementedException("publish not implemented");
+ String apiPath = getPath(routeName, apiName, RouteMethod.PUBLISH);
+ try {
+ vertxEventBus.publish(apiPath, JsonMapper.jsonSerialize(msg), deliveryOptions);
+ log.debug("publish({}, {}) delivered.", apiPath, msg.getId());
+ } catch (Exception e) {
+ log.error("publish({}, {}) Unexpected failure:{}", apiPath, msg.getId(), e.getMessage());
+ }
}
public CompletableFuture request(String routeName, String apiName, ClusterMessage msg) {
@@ -57,7 +62,7 @@ public CompletableFuture request(String routeName, String apiNa
try {
vertxEventBus.request(apiPath, JsonMapper.jsonSerialize(msg), deliveryOptions, ar -> {
if (ar.succeeded()) {
- log.debug("request({}, {}) delivered. {}.", apiPath, msg.getId(), ar.result().body());
+ log.info("request({}, {}) delivered. {}.", apiPath, msg.getId(), ar.result().body());
ResponseMessage response =
JsonMapper.jsonDeserialize((String) ar.result().body(), ResponseMessage.class);
if (response.getException() != null) {
diff --git a/server/src/main/java/com/flipkart/varadhi/cluster/MessageRouter.java b/server/src/main/java/com/flipkart/varadhi/cluster/MessageRouter.java
index 05e59604..7e4b6e29 100644
--- a/server/src/main/java/com/flipkart/varadhi/cluster/MessageRouter.java
+++ b/server/src/main/java/com/flipkart/varadhi/cluster/MessageRouter.java
@@ -1,8 +1,10 @@
package com.flipkart.varadhi.cluster;
-import com.flipkart.varadhi.cluster.messages.*;
-import com.flipkart.varadhi.exceptions.NotImplementedException;
+import com.flipkart.varadhi.cluster.messages.ClusterMessage;
+import com.flipkart.varadhi.cluster.messages.MsgHandler;
+import com.flipkart.varadhi.cluster.messages.RequestHandler;
+import com.flipkart.varadhi.cluster.messages.ResponseMessage;
import com.flipkart.varadhi.exceptions.VaradhiException;
import com.flipkart.varadhi.utils.JsonMapper;
import io.vertx.core.eventbus.DeliveryOptions;
@@ -61,7 +63,7 @@ public void requestHandler(
String apiPath = getApiPath(routeName, apiName, RouteMethod.REQUEST);
vertxEventBus.consumer(apiPath, message -> {
ClusterMessage msg = JsonMapper.jsonDeserialize((String) message.body(), ClusterMessage.class);
- log.debug("Received msg via - request({}, {})", apiPath, msg.getId());
+ log.info("Received msg via - request({}, {})", apiPath, msg.getId());
try {
handler.handle(msg).thenAccept(response -> message.reply(
JsonMapper.jsonSerialize(response),
@@ -90,7 +92,18 @@ public void requestHandler(
}
public void publishHandler(String routeName, String apiName, MsgHandler handler) {
- throw new NotImplementedException("handlePublish not implemented");
+ // TODO(rl): not sure if publishHandler should do anything different as compared to sendHandler except sending reply
+ String apiPath = getApiPath(routeName, apiName, RouteMethod.PUBLISH);
+ vertxEventBus.consumer(apiPath, message -> {
+ ClusterMessage msg = JsonMapper.jsonDeserialize((String) message.body(), ClusterMessage.class);
+ log.debug("Received msg via - publish({}, {})", apiPath, msg.getId());
+ try {
+ // this is async invocation.
+ handler.handle(msg);
+ } catch (Exception e) {
+ log.error("publish handler.handle({}) Unhandled exception: {}", message.body(), e.getMessage());
+ }
+ });
}
private String getApiPath(String routeName, String apiName, RouteMethod method) {
diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/controller/ControllerVerticle.java b/server/src/main/java/com/flipkart/varadhi/verticles/controller/ControllerVerticle.java
index ac2365c9..a41d0c6b 100644
--- a/server/src/main/java/com/flipkart/varadhi/verticles/controller/ControllerVerticle.java
+++ b/server/src/main/java/com/flipkart/varadhi/verticles/controller/ControllerVerticle.java
@@ -1,24 +1,32 @@
package com.flipkart.varadhi.verticles.controller;
import com.flipkart.varadhi.CoreServices;
-import com.flipkart.varadhi.cluster.*;
-import com.flipkart.varadhi.controller.OperationMgr;
+import com.flipkart.varadhi.cluster.MembershipListener;
+import com.flipkart.varadhi.cluster.MessageExchange;
+import com.flipkart.varadhi.cluster.MessageRouter;
+import com.flipkart.varadhi.cluster.VaradhiClusterManager;
import com.flipkart.varadhi.controller.AssignmentManager;
+import com.flipkart.varadhi.controller.ControllerApiMgr;
+import com.flipkart.varadhi.controller.DistributedRateLimiterImpl;
+import com.flipkart.varadhi.controller.OperationMgr;
import com.flipkart.varadhi.controller.RetryPolicy;
import com.flipkart.varadhi.controller.config.ControllerConfig;
import com.flipkart.varadhi.controller.impl.LeastAssignedStrategy;
-import com.flipkart.varadhi.entities.cluster.*;
+import com.flipkart.varadhi.core.capacity.TopicCapacityService;
import com.flipkart.varadhi.core.cluster.ConsumerClientFactory;
+import com.flipkart.varadhi.entities.cluster.*;
import com.flipkart.varadhi.exceptions.NotImplementedException;
+import com.flipkart.varadhi.services.VaradhiTopicService;
import com.flipkart.varadhi.spi.db.MetaStoreProvider;
+import com.flipkart.varadhi.spi.services.MessagingStackProvider;
import com.flipkart.varadhi.verticles.consumer.ConsumerClientFactoryImpl;
-import com.flipkart.varadhi.controller.ControllerApiMgr;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import lombok.extern.slf4j.Slf4j;
+import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
@@ -35,6 +43,7 @@ public class ControllerVerticle extends AbstractVerticle {
private final MetaStoreProvider metaStoreProvider;
private final MeterRegistry meterRegistry;
private final ControllerConfig controllerConfig;
+ private final VaradhiTopicService varadhiTopicService;
public ControllerVerticle(
ControllerConfig config, CoreServices coreServices, VaradhiClusterManager clusterManager
@@ -43,6 +52,8 @@ public ControllerVerticle(
this.clusterManager = clusterManager;
this.metaStoreProvider = coreServices.getMetaStoreProvider();
this.meterRegistry = coreServices.getMeterRegistry();
+ MessagingStackProvider messagingStackProvider = coreServices.getMessagingStackProvider();
+ this.varadhiTopicService = new VaradhiTopicService(messagingStackProvider.getStorageTopicService(), coreServices.getMetaStoreProvider().getMetaStore());
}
@Override
@@ -51,9 +62,17 @@ public void start(Promise startPromise) {
MessageExchange messageExchange = clusterManager.getExchange(vertx);
ControllerApiMgr controllerApiMgr = getControllerApiMgr(messageExchange);
ControllerApiHandler handler = new ControllerApiHandler(controllerApiMgr);
+ DistributedRateLimiterImpl distributedRateLimiterImpl =
+ new DistributedRateLimiterImpl(5, 1000, new TopicCapacityService() {
+ @Override
+ public int getThroughputLimit(String topic) {
+ return varadhiTopicService.get(topic).getCapacity().getThroughputKBps() * 1024;
+ }
+ }, Clock.systemUTC()); //TODO(rl): config driven
+ TrafficDataHandler trafficDataHandler = new TrafficDataHandler(distributedRateLimiterImpl);
//TODO::Assuming one controller node for time being. Leader election needs to be added.
- onLeaderElected(controllerApiMgr, handler, messageRouter).onComplete(ar -> {
+ onLeaderElected(controllerApiMgr, handler, trafficDataHandler, messageRouter).onComplete(ar -> {
if (ar.failed()) {
startPromise.fail(ar.cause());
} else {
@@ -82,7 +101,8 @@ private RetryPolicy getRetryPolicy() {
}
private Future onLeaderElected(
- ControllerApiMgr controllerApiMgr, ControllerApiHandler handler, MessageRouter messageRouter
+ ControllerApiMgr controllerApiMgr, ControllerApiHandler handler, TrafficDataHandler trafficDataHandle,
+ MessageRouter messageRouter
) {
// any failures should give up the leadership.
//TODO:: check what happens when membership changes post listener setup but prior to bootstrap completion.
@@ -106,7 +126,7 @@ private Future onLeaderElected(
log.error("ConsumerNode:{} failed to join the cluster {}.", "", ce.getCause().getMessage());
}
})).thenAccept(v -> {
- setupApiHandlers(messageRouter, handler);
+ setupApiHandlers(messageRouter, handler, trafficDataHandle);
restoreController(controllerApiMgr, addedConsumers);
});
return Future.fromCompletionStage(future);
@@ -176,13 +196,16 @@ public void stop(Promise stopPromise) {
stopPromise.complete();
}
- private void setupApiHandlers(MessageRouter messageRouter, ControllerApiHandler handler) {
+ private void setupApiHandlers(
+ MessageRouter messageRouter, ControllerApiHandler handler, TrafficDataHandler trafficDataHandler
+ ) {
messageRouter.requestHandler(ROUTE_CONTROLLER, "start", handler::start);
messageRouter.requestHandler(ROUTE_CONTROLLER, "stop", handler::stop);
messageRouter.requestHandler(ROUTE_CONTROLLER, "status", handler::status);
messageRouter.requestHandler(ROUTE_CONTROLLER, "unsideline", handler::unsideline);
messageRouter.requestHandler(ROUTE_CONTROLLER, "getShards", handler::getShards);
messageRouter.sendHandler(ROUTE_CONTROLLER, "update", handler::update);
+ messageRouter.requestHandler(ROUTE_CONTROLLER, "collect", trafficDataHandler::handle);
}
private void setupMembershipListener(ControllerApiMgr controllerApiMgr) {
diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/controller/TrafficDataHandler.java b/server/src/main/java/com/flipkart/varadhi/verticles/controller/TrafficDataHandler.java
new file mode 100644
index 00000000..62754b4d
--- /dev/null
+++ b/server/src/main/java/com/flipkart/varadhi/verticles/controller/TrafficDataHandler.java
@@ -0,0 +1,25 @@
+package com.flipkart.varadhi.verticles.controller;
+
+import com.flipkart.varadhi.cluster.messages.ClusterMessage;
+import com.flipkart.varadhi.cluster.messages.ResponseMessage;
+import com.flipkart.varadhi.qos.DistributedRateLimiter;
+import com.flipkart.varadhi.qos.entity.ClientLoadInfo;
+import com.flipkart.varadhi.qos.entity.SuppressionData;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.CompletableFuture;
+
+@Slf4j
+public class TrafficDataHandler {
+ private final DistributedRateLimiter rateLimiter;
+
+ public TrafficDataHandler(DistributedRateLimiter rateLimiter) {
+ this.rateLimiter = rateLimiter;
+ }
+
+ public CompletableFuture handle(ClusterMessage message) {
+ SuppressionData suppressionData = rateLimiter.addTrafficData(message.getData(ClientLoadInfo.class));
+ return CompletableFuture.completedFuture(suppressionData).thenApply(message::getResponseMessage);
+ }
+
+}
diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerVerticle.java b/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerVerticle.java
index 6aa8313f..177c085f 100644
--- a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerVerticle.java
+++ b/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerVerticle.java
@@ -4,25 +4,31 @@
import com.flipkart.varadhi.auth.DefaultAuthorizationProvider;
import com.flipkart.varadhi.cluster.MessageExchange;
import com.flipkart.varadhi.cluster.VaradhiClusterManager;
+import com.flipkart.varadhi.cluster.messages.ClusterMessage;
+import com.flipkart.varadhi.config.AppConfiguration;
+import com.flipkart.varadhi.core.cluster.ControllerRestApi;
import com.flipkart.varadhi.entities.StorageTopic;
import com.flipkart.varadhi.entities.TopicCapacityPolicy;
import com.flipkart.varadhi.entities.VaradhiTopic;
-import com.flipkart.varadhi.spi.ConfigFileResolver;
-import com.flipkart.varadhi.spi.services.Producer;
-import com.flipkart.varadhi.utils.ShardProvisioner;
-import com.flipkart.varadhi.utils.VaradhiSubscriptionFactory;
-import com.flipkart.varadhi.verticles.consumer.ConsumerClientFactoryImpl;
-import com.flipkart.varadhi.verticles.controller.ControllerRestClient;
-import com.flipkart.varadhi.config.AppConfiguration;
-import com.flipkart.varadhi.utils.VaradhiTopicFactory;
-import com.flipkart.varadhi.services.VaradhiTopicService;
-import com.flipkart.varadhi.core.cluster.ControllerRestApi;
+import com.flipkart.varadhi.entities.cluster.MemberInfo;
import com.flipkart.varadhi.produce.otel.ProducerMetricHandler;
import com.flipkart.varadhi.produce.services.ProducerService;
+import com.flipkart.varadhi.qos.DistributedRateLimiter;
+import com.flipkart.varadhi.qos.RateLimiterMetrics;
+import com.flipkart.varadhi.qos.RateLimiterService;
+import com.flipkart.varadhi.qos.entity.ClientLoadInfo;
+import com.flipkart.varadhi.qos.entity.SuppressionData;
import com.flipkart.varadhi.services.*;
+import com.flipkart.varadhi.spi.ConfigFileResolver;
import com.flipkart.varadhi.spi.db.IamPolicyMetaStore;
import com.flipkart.varadhi.spi.db.MetaStore;
import com.flipkart.varadhi.spi.services.MessagingStackProvider;
+import com.flipkart.varadhi.spi.services.Producer;
+import com.flipkart.varadhi.utils.ShardProvisioner;
+import com.flipkart.varadhi.utils.VaradhiSubscriptionFactory;
+import com.flipkart.varadhi.utils.VaradhiTopicFactory;
+import com.flipkart.varadhi.verticles.consumer.ConsumerClientFactoryImpl;
+import com.flipkart.varadhi.verticles.controller.ControllerRestClient;
import com.flipkart.varadhi.web.*;
import com.flipkart.varadhi.web.routes.RouteBehaviour;
import com.flipkart.varadhi.web.routes.RouteConfigurator;
@@ -42,9 +48,12 @@
import lombok.experimental.ExtensionMethod;
import lombok.extern.slf4j.Slf4j;
+import java.net.UnknownHostException;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
+import static com.flipkart.varadhi.core.cluster.ControllerRestApi.ROUTE_CONTROLLER;
@Slf4j
@ExtensionMethod({Extensions.RoutingContextExtension.class})
@@ -52,6 +61,7 @@ public class WebServerVerticle extends AbstractVerticle {
private final Map routeBehaviourConfigurators = new HashMap<>();
private final AppConfiguration configuration;
private final ConfigFileResolver configResolver;
+ private final MemberInfo memberInfo;
private final VaradhiClusterManager clusterManager;
private final MessagingStackProvider messagingStackProvider;
private final MetaStore metaStore;
@@ -62,14 +72,16 @@ public class WebServerVerticle extends AbstractVerticle {
private ProjectService projectService;
private VaradhiTopicService varadhiTopicService;
private SubscriptionService subscriptionService;
+ private RateLimiterService rateLimiterService;
private DlqService dlqService;
private HttpServer httpServer;
public WebServerVerticle(
- AppConfiguration configuration, CoreServices services, VaradhiClusterManager clusterManager
+ AppConfiguration configuration, MemberInfo memberInfo, CoreServices services, VaradhiClusterManager clusterManager
) {
this.configuration = configuration;
this.configResolver = services.getConfigResolver();
+ this.memberInfo = memberInfo;
this.clusterManager = clusterManager;
this.messagingStackProvider = services.getMessagingStackProvider();
this.metaStore = services.getMetaStoreProvider().getMetaStore();
@@ -128,7 +140,25 @@ private void setupEntityServices() {
);
subscriptionService = new SubscriptionService(shardProvisioner, controllerClient, metaStore);
dlqService = new DlqService(controllerClient, new ConsumerClientFactoryImpl(messageExchange));
+ try {
+ // use host address as clientId for now.
+ String clientId = memberInfo.hostname();
+ rateLimiterService = new RateLimiterService(new DistributedRateLimiter() {
+ final MessageExchange exchange = clusterManager.getExchange(vertx);
+ @Override
+ public SuppressionData addTrafficData(ClientLoadInfo loadInfo) {
+ ClusterMessage msg = ClusterMessage.of(loadInfo);
+ CompletableFuture suppressionDataResponse =
+ exchange.request(ROUTE_CONTROLLER, "collect", msg)
+ .thenApply(rm -> rm.getResponse(SuppressionData.class));
+ // todo(rl): runtime exceptions not caught. Can this lead to unrecoverable state?
+ return suppressionDataResponse.join();
+ }
+ }, new RateLimiterMetrics(meterRegistry, clientId), 1, clientId); // TODO(rl): convert to config
+ } catch (UnknownHostException e) {
+ log.error("Error creating RateLimiterService", e);
+ }
}
private void performValidations() {
@@ -238,7 +268,7 @@ private List getProduceApiRoutes() {
new ProducerMetricHandler(configuration.getProducerOptions().isMetricEnabled(), meterRegistry);
return new ArrayList<>(
new ProduceHandlers(deployedRegion, headerValidator::validate, producerService, projectService,
- producerMetricsHandler
+ producerMetricsHandler, rateLimiterService
).get());
}
diff --git a/server/src/main/java/com/flipkart/varadhi/web/v1/produce/ProduceHandlers.java b/server/src/main/java/com/flipkart/varadhi/web/v1/produce/ProduceHandlers.java
index e96bd694..f56a49ef 100644
--- a/server/src/main/java/com/flipkart/varadhi/web/v1/produce/ProduceHandlers.java
+++ b/server/src/main/java/com/flipkart/varadhi/web/v1/produce/ProduceHandlers.java
@@ -9,6 +9,7 @@
import com.flipkart.varadhi.services.ProjectService;
import com.flipkart.varadhi.utils.HeaderUtils;
import com.flipkart.varadhi.utils.MessageHelper;
+import com.flipkart.varadhi.qos.RateLimiterService;
import com.flipkart.varadhi.web.Extensions.RequestBodyExtension;
import com.flipkart.varadhi.web.Extensions.RoutingContextExtension;
import com.flipkart.varadhi.web.routes.RouteDefinition;
@@ -45,16 +46,18 @@ public class ProduceHandlers implements RouteProvider {
private final ProjectService projectService;
private final ProducerMetricHandler metricHandler;
private final String produceRegion;
+ private final RateLimiterService rateLimiterService;
public ProduceHandlers(
String produceRegion, Handler headerValidationHandler, ProducerService producerService,
- ProjectService projectService, ProducerMetricHandler metricHandler
+ ProjectService projectService, ProducerMetricHandler metricHandler, RateLimiterService rateLimiterService
) {
this.produceRegion = produceRegion;
this.producerService = producerService;
this.headerValidationHandler = headerValidationHandler;
this.projectService = projectService;
this.metricHandler = metricHandler;
+ this.rateLimiterService = rateLimiterService;
}
@Override
@@ -92,12 +95,18 @@ public void produce(RoutingContext ctx) {
ProducerMetricsEmitter metricsEmitter = metricHandler.getEmitter(ctx.body().length(), produceAttributes);
String varadhiTopicName = VaradhiTopic.buildTopicName(projectName, topicName);
+ if(!rateLimiterService.isAllowed(varadhiTopicName, ctx.body().length())) {
+ ctx.endRequestWithStatusAndErrorMsg(HTTP_RATE_LIMITED, "Rate limited");
+ return;
+ }
// TODO:: Below is making extra copy, this needs to be avoided.
// ctx.body().buffer().getByteBuf().array() -- method gives complete backing array w/o copy,
// however only required bytes are needed. Need to figure out the correct mechanism here.
byte[] payload = ctx.body().buffer().getBytes();
Message messageToProduce = buildMessageToProduce(payload, ctx.request().headers(), produceIdentity);
+
+
CompletableFuture produceFuture =
producerService.produceToTopic(messageToProduce, varadhiTopicName, metricsEmitter);
produceFuture.whenComplete((produceResult, failure) ->
diff --git a/server/src/test/java/com/flipkart/varadhi/qos/RateLimiterServiceTest.java b/server/src/test/java/com/flipkart/varadhi/qos/RateLimiterServiceTest.java
new file mode 100644
index 00000000..eada8186
--- /dev/null
+++ b/server/src/test/java/com/flipkart/varadhi/qos/RateLimiterServiceTest.java
@@ -0,0 +1,287 @@
+package com.flipkart.varadhi.qos;
+
+import com.flipkart.varadhi.CoreServices;
+import com.flipkart.varadhi.config.AppConfiguration;
+import com.flipkart.varadhi.controller.DistributedRateLimiterImpl;
+import com.flipkart.varadhi.core.capacity.TopicCapacityService;
+import com.flipkart.varadhi.utils.HostUtils;
+import com.flipkart.varadhi.utils.YamlLoader;
+import com.google.common.util.concurrent.RateLimiter;
+import io.micrometer.core.instrument.MeterRegistry;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.time.Clock;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Stream;
+
+import static org.mockito.Mockito.when;
+
+@Slf4j
+public class RateLimiterServiceTest {
+
+ private static MeterRegistry meterRegistry;
+ private DistributedRateLimiterImpl distributedRateLimiterImpl;
+ @Mock
+ private TopicCapacityService topicCapacityService;
+
+ @BeforeAll
+ public static void setup() throws UnknownHostException {
+ HostUtils.initHostUtils();
+ setupMetrics();
+ }
+
+ public static void setupMetrics() {
+ AppConfiguration appConfiguration = YamlLoader.loadConfig(
+ "test/configuration.yml",
+ AppConfiguration.class
+ );
+ CoreServices.ObservabilityStack observabilityStack = new CoreServices.ObservabilityStack(appConfiguration);
+ meterRegistry = observabilityStack.getMeterRegistry();
+ }
+
+ @BeforeEach
+ public void setUpController() {
+ MockitoAnnotations.openMocks(this); // Initialize mocks
+
+ //setup controller side of things
+ Clock clock = Clock.systemUTC();
+ distributedRateLimiterImpl = new DistributedRateLimiterImpl(10, 1000, topicCapacityService, clock);
+ }
+
+ private static Stream provideRateLimitTestFilePaths() {
+ return Stream.of(
+ Arguments.of("src/test/resources/simulation_profiles/test_skewness_low.profile"),
+ Arguments.of("src/test/resources/simulation_profiles/test_skewness_low.profile"),
+ Arguments.of("src/test/resources/simulation_profiles/test_skewness_medium.profile"),
+ Arguments.of("src/test/resources/simulation_profiles/test_skewness_medium.profile"),
+ Arguments.of("src/test/resources/simulation_profiles/test_skewness_high.profile"),
+ Arguments.of("src/test/resources/simulation_profiles/test_skewness_high.profile"),
+ Arguments.of("src/test/resources/simulation_profiles/test_skewness_very_high.profile"),
+ Arguments.of("src/test/resources/simulation_profiles/test_skewness_very_high.profile")
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideRateLimitTestFilePaths")
+ public void clientLoadSimulation(String filePath) throws IOException, InterruptedException {
+ BufferedReader reader = new BufferedReader(new FileReader(filePath));
+ String line;
+ List threads = new ArrayList<>();
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
+ Map clientRateLimiterMap = new HashMap<>();
+ // data structure that stores client -> topic -> (throughput, qps, duration, allowedBytes, rejectedBytes)
+ // used to calculate error rate
+ Map>> topicClientLoadMap = new ConcurrentHashMap<>();
+ Map topicThroughputQuotaMap = new HashMap<>();
+
+ while ((line = reader.readLine()) != null) {
+ String[] parts = line.split(",", 3);
+ String client = parts[0];
+ String[] topicLoads = parts[2].split(":");
+ String topic = parts[1];
+ int throughputQuota = Integer.parseInt(topicLoads[0]);
+ topicThroughputQuotaMap.put(topic, throughputQuota);
+ log.debug("Setting throughput for topic: {}, throughput: {}", topic, throughputQuota);
+ when(topicCapacityService.getThroughputLimit(topic)).thenReturn(throughputQuota);
+
+ // check if ratelimiterservice exists for a topic
+ clientRateLimiterMap.putIfAbsent(client, createRateLimiterSvc(client));
+
+ Runnable clientLoadSimulator = () -> {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Client: ").append(client).append(" topic: ").append(topic).append(": \n");
+
+ for (int i = 1; i < topicLoads.length; i++) {
+ String[] loadParts = topicLoads[i].split(",");
+ long duration = Long.parseLong(loadParts[0]) * 1000; // Convert to milliseconds
+ long throughput = Long.parseLong(loadParts[1]);
+ long qps = Long.parseLong(loadParts[2]);
+ long dataSize = throughput / qps;
+ long startTime = System.currentTimeMillis();
+ long allowedBytes = 0, rejectedBytes = 0;
+
+ RateLimiter rateLimiter = RateLimiter.create(qps);
+ while (System.currentTimeMillis() - startTime < duration) {
+ rateLimiter.acquire();
+ boolean allowed = clientRateLimiterMap.get(client).isAllowed(topic, dataSize);
+ if (allowed) {
+ allowedBytes += dataSize;
+ } else {
+ rejectedBytes += dataSize;
+ }
+ }
+ sb.append("Start Time: ").append(new Date(startTime))
+ .append(", Duration: ").append(duration)
+ .append(", throughput: ").append(throughput)
+ .append(", allowedBytes: ").append(allowedBytes)
+ .append(", rejectedBytes: ").append(rejectedBytes)
+ .append("\n");
+
+ // store result data for error rate calculation later on
+ log.debug(
+ "[] Client: {}, Topic: {}, Duration: {}, Allowed Bytes: {}, Rejected Bytes: {}", client,
+ topic, duration, allowedBytes, rejectedBytes
+ );
+ storeResult(
+ topicClientLoadMap, client, topic, duration / 1000, allowedBytes,
+ allowedBytes + rejectedBytes
+ );
+ }
+ log.debug(sb.toString());
+ };
+
+ threads.add(new Thread(clientLoadSimulator));
+ }
+
+ // Start all threads at the same time
+ for (Thread thread : threads) {
+ executorService.submit(thread);
+ }
+
+ // Wait for all threads to finish
+ executorService.shutdown();
+ while (!executorService.isTerminated()) {
+ Thread.sleep(100);
+ }
+
+ // Calculate error rate
+ topicClientLoadMap.forEach((topic, clientLoadMap) -> {
+ // here clientLoadMap is map of client -> List
+ // iterate through each client data assuming every client starts from the same point
+ // consider duration and sum up the allowedBytes for each client and compare with the allowed throughput
+ List> clientDataList = new ArrayList<>();
+ clientLoadMap.forEach((client, testDataList) -> clientDataList.add(testDataList));
+ if (clientDataList.size() > 1) {
+ List errors = calculateNormalisedError(clientDataList, topicThroughputQuotaMap.get(topic));
+ log.info("topic: {} errors: {}", topic, errors);
+ log.info("topic: {} absolute error: {}", topic, calculateAbsoluteError(errors));
+ log.info("topic: {} mean error: {}", topic, calculateAbsoluteError(errors)/errors.size());
+ log.info("topic: {} standard deviation: {}", topic, calculateStandardDeviation(errors));
+ }
+ });
+ }
+
+ private static List calculateNormalisedError(List> clientDataList, long throguhput) {
+ List result = new ArrayList<>();
+
+ while (true) {
+ long minDuration = Long.MAX_VALUE;
+ long sumAllowedBytes = 0;
+ long sumProducedBytes = 0;
+ boolean hasMore = false;
+
+ for (List list : clientDataList) {
+ if (!list.isEmpty()) {
+ hasMore = true;
+ TestData dv = list.get(0);
+ if (dv.duration < minDuration) {
+ minDuration = dv.duration;
+ }
+ }
+ }
+
+ if (!hasMore) {
+ break;
+ }
+
+ for (List clientTestData : clientDataList) {
+ if (!clientTestData.isEmpty()) {
+ TestData dv = clientTestData.get(0);
+ sumAllowedBytes += (minDuration * dv.allowedBytes / dv.duration);
+ sumProducedBytes += (minDuration * dv.generatedBytes / dv.duration);
+ if (dv.duration == minDuration) {
+ clientTestData.remove(0);
+ } else {
+ dv.allowedBytes -= (minDuration * dv.allowedBytes / dv.duration);
+ dv.generatedBytes -= (minDuration * dv.generatedBytes / dv.duration);
+ dv.duration -= minDuration;
+ if (dv.allowedBytes <= 0) {
+ throw new RuntimeException("Unexpected error");
+ // something failed here, find out why
+ }
+ }
+ }
+ }
+
+ result.add(new TestData(minDuration, sumAllowedBytes, sumProducedBytes));
+ }
+
+ List errors = new ArrayList<>();
+ for (TestData dv : result) {
+ long maxBytes = Math.min(throguhput * dv.duration, dv.generatedBytes);
+ long allowedBytes = dv.allowedBytes;
+ errors.add(((double) (Math.abs(allowedBytes - maxBytes)) / throguhput));
+ log.debug(
+ "Duration: {}, Allowed Bytes: {}, Generated Bytes: {}", dv.duration, dv.allowedBytes,
+ dv.generatedBytes
+ );
+ }
+
+ return errors;
+ }
+
+ private static double calculateAbsoluteError(List errors) {
+ return errors.stream().mapToDouble(val -> val).sum();
+ }
+
+ private static double calculateStandardDeviation(List errors) {
+ double mean = errors.stream().mapToDouble(val -> val).average().orElse(0.0);
+ double variance = errors.stream().mapToDouble(val -> Math.pow(val - mean, 2)).average().orElse(0.0);
+ return Math.sqrt(variance);
+ }
+
+ private void storeResult(
+ Map>> topicClientLoadMap, String client, String topic, long duration,
+ long allowedBytes, long generatedBytes
+ ) {
+ topicClientLoadMap.compute(topic, (k, v) -> {
+ if (v == null) {
+ v = new ConcurrentHashMap<>();
+ }
+ v.compute(client, (k1, v1) -> {
+ if (v1 == null) {
+ v1 = new ArrayList<>();
+ }
+ TestData testData = new TestData(duration, allowedBytes, generatedBytes);
+ v1.add(testData);
+ return v1;
+ });
+ return v;
+ });
+ }
+
+ private RateLimiterService createRateLimiterSvc(String clientId) throws UnknownHostException {
+ return new RateLimiterService(
+ load -> distributedRateLimiterImpl.addTrafficData(load),
+ new RateLimiterMetrics(meterRegistry, clientId), 1, clientId
+ );
+ }
+
+ @Getter
+ private static class TestData {
+ private long duration;
+ private long allowedBytes;
+ private long generatedBytes;
+
+ public TestData(long duration, long allowedBytes, long generatedBytes) {
+ this.duration = duration;
+ this.allowedBytes = allowedBytes;
+ this.generatedBytes = generatedBytes;
+ }
+ }
+}
diff --git a/server/src/test/java/com/flipkart/varadhi/web/produce/ProduceTestBase.java b/server/src/test/java/com/flipkart/varadhi/web/produce/ProduceTestBase.java
index c42242d1..400de423 100644
--- a/server/src/test/java/com/flipkart/varadhi/web/produce/ProduceTestBase.java
+++ b/server/src/test/java/com/flipkart/varadhi/web/produce/ProduceTestBase.java
@@ -7,6 +7,7 @@
import com.flipkart.varadhi.produce.otel.ProducerMetricsEmitterNoOpImpl;
import com.flipkart.varadhi.produce.services.ProducerService;
import com.flipkart.varadhi.services.ProjectService;
+import com.flipkart.varadhi.qos.RateLimiterService;
import com.flipkart.varadhi.web.RequestTelemetryConfigurator;
import com.flipkart.varadhi.web.SpanProvider;
import com.flipkart.varadhi.web.WebTestBase;
@@ -18,6 +19,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -27,7 +29,7 @@ public class ProduceTestBase extends WebTestBase {
ProjectService projectService;
String deployedRegion = "region1";
String serviceHost = "localhost";
-
+ RateLimiterService rateLimiterService;
ArgumentCaptor msgCapture;
String topicPath = "/projects/project1/topics/topic1/produce";
String topicFullName = "project1.topic1";
@@ -51,8 +53,11 @@ public void setUp() throws InterruptedException {
HeaderValidationHandler headerHandler = new HeaderValidationHandler(options);
ProducerMetricHandler metricHandler = mock(ProducerMetricHandler.class);
doReturn(new ProducerMetricsEmitterNoOpImpl()).when(metricHandler).getEmitter(anyInt(), any());
+ rateLimiterService = mock(RateLimiterService.class);
+ // no rate limit in tests
+ doReturn(true).when(rateLimiterService).isAllowed(any(), anyLong());
produceHandlers = new ProduceHandlers(deployedRegion, headerHandler::validate, producerService, projectService,
- metricHandler
+ metricHandler, rateLimiterService
);
route = router.post("/projects/:project/topics/:topic/produce");
msgCapture = ArgumentCaptor.forClass(Message.class);
diff --git a/server/src/test/resources/simulation_profiles/single_client_multi_topic_test b/server/src/test/resources/simulation_profiles/single_client_multi_topic_test
new file mode 100644
index 00000000..f393c609
--- /dev/null
+++ b/server/src/test/resources/simulation_profiles/single_client_multi_topic_test
@@ -0,0 +1,5 @@
+client1,topic1:10,4000,10:10,5000,12:10,6000,15:10,7000,17:10,8000,20:10,9000,22:10,10000,25:10,11000,27:10,12000,30:10,13000,32:10,14000,35:10,15000,37:10,16000,40:10,17000,42:10,18000,45:10,19000,47:10,20000,50
+client1,topic2:10,4000,10:10,5000,12:10,6000,15:10,7000,17:10,8000,20:10,9000,22:10,10000,25:10,11000,27:10,12000,30:10,13000,32:10,14000,35:10,15000,37:10,16000,40:10,17000,42:10,18000,45:10,19000,47:10,20000,50
+client1,topic3:10,4000,10:10,5000,12:10,6000,15:10,7000,17:10,8000,20:10,9000,22:10,10000,25:10,11000,27:10,12000,30:10,13000,32:10,14000,35:10,15000,37:10,16000,40:10,17000,42:10,18000,45:10,19000,47:10,20000,50
+client1,topic4:10,4000,10:10,5000,12:10,6000,15:10,7000,17:10,8000,20:10,9000,22:10,10000,25:10,11000,27:10,12000,30:10,13000,32:10,14000,35:10,15000,37:10,16000,40:10,17000,42:10,18000,45:10,19000,47:10,20000,50
+client1,topic5:10,4000,10:10,5000,12:10,6000,15:10,7000,17:10,8000,20:10,9000,22:10,10000,25:10,11000,27:10,12000,30:10,13000,32:10,14000,35:10,15000,37:10,16000,40:10,17000,42:10,18000,45:10,19000,47:10,20000,50
diff --git a/server/src/test/resources/simulation_profiles/single_client_single_topic_test b/server/src/test/resources/simulation_profiles/single_client_single_topic_test
new file mode 100644
index 00000000..35030217
--- /dev/null
+++ b/server/src/test/resources/simulation_profiles/single_client_single_topic_test
@@ -0,0 +1 @@
+client1,topic1:10,4000,10,4000:10,5000,12,5000:10,6000,15,6000:10,7000,17,7000:10,8000,20,8000:10,9000,22,9000:10,10000,25,10000:10,11000,27,10240:10,12000,30,10240:10,13000,32,10240:10,14000,35,10240:10,15000,37,10240:10,16000,40,10240:10,17000,42,10240:10,18000,45,10240:10,19000,47,10240:10,20000,50,10240
diff --git a/server/src/test/resources/simulation_profiles/test_load.profile b/server/src/test/resources/simulation_profiles/test_load.profile
new file mode 100644
index 00000000..2631ece5
--- /dev/null
+++ b/server/src/test/resources/simulation_profiles/test_load.profile
@@ -0,0 +1,4 @@
+client1,topic1,1024:4,4000,100:4,7000,100:4,10000,100
+client1,topic2,1024:4,5000,100:4,15000,100:4,30000,100
+client2,topic1,1024:2,4000,100:8,6000,100
+client3,topic2,1024:2,2000,100:6,15000,100
diff --git a/server/src/test/resources/simulation_profiles/test_load1.profile b/server/src/test/resources/simulation_profiles/test_load1.profile
new file mode 100644
index 00000000..beb1a235
--- /dev/null
+++ b/server/src/test/resources/simulation_profiles/test_load1.profile
@@ -0,0 +1,4 @@
+client1,topic1,10000:10,4000,100:10,7000,100:10,10000,100:10,15000,100:10,100000,100
+client1,topic2,10000:10,5000,100:10,15000,100:10,30000,100
+client2,topic1,10000:10,4000,100:10,6000,100
+client3,topic2,10000:10,2000,100:10,15000,100
diff --git a/server/src/test/resources/simulation_profiles/test_load2.profile b/server/src/test/resources/simulation_profiles/test_load2.profile
new file mode 100644
index 00000000..2f633598
--- /dev/null
+++ b/server/src/test/resources/simulation_profiles/test_load2.profile
@@ -0,0 +1,4 @@
+client1,topic1,10000:10,2000,100:10,3000,100:10,4000,100:10,5000,100:10,100000,100
+client2,topic1,10000:10,2000,100:10,3000,100:10,4000,100:10,5000,100:10,100000,100
+client3,topic1,10000:10,2000,100:10,3000,100:10,4000,100:10,5000,100:10,100000,100
+client4,topic1,10000:10,2000,100:10,3000,100:10,4000,100:10,5000,100:10,100000,100
diff --git a/server/src/test/resources/simulation_profiles/test_skewness_high.profile b/server/src/test/resources/simulation_profiles/test_skewness_high.profile
new file mode 100644
index 00000000..2e576f60
--- /dev/null
+++ b/server/src/test/resources/simulation_profiles/test_skewness_high.profile
@@ -0,0 +1,4 @@
+client1,topic1,10000:10,16000,100:10,3000,100:10,3000,100:10,3000,100:10,16000,100:10,3000,100:10,3000,100:10,3000,100:10,16000,100
+client2,topic1,10000:10,3000,100:10,16000,100:10,3000,100:10,3000,100:10,3000,100:10,16000,100:10,3000,100:10,3000,100:10,3000,100
+client3,topic1,10000:10,3000,100:10,3000,100:10,16000,100:10,3000,100:10,3000,100:10,3000,100:10,16000,100:10,3000,100:10,3000,100
+client4,topic1,10000:10,3000,100:10,3000,100:10,3000,100:10,16000,100:10,3000,100:10,3000,100:10,3000,100:10,16000,100:10,3000,100
diff --git a/server/src/test/resources/simulation_profiles/test_skewness_low.profile b/server/src/test/resources/simulation_profiles/test_skewness_low.profile
new file mode 100644
index 00000000..a262fe2c
--- /dev/null
+++ b/server/src/test/resources/simulation_profiles/test_skewness_low.profile
@@ -0,0 +1,4 @@
+client1,topic1,10000:10,7000,100:10,6000,100:10,6000,100:10,6000,100:10,7000,100:10,6000,100:10,6000,100:10,6000,100:10,7000,100
+client2,topic1,10000:10,6000,100:10,7000,100:10,6000,100:10,6000,100:10,6000,100:10,7000,100:10,6000,100:10,6000,100:10,6000,100
+client3,topic1,10000:10,6000,100:10,6000,100:10,7000,100:10,6000,100:10,6000,100:10,6000,100:10,7000,100:10,6000,100:10,6000,100
+client4,topic1,10000:10,6000,100:10,6000,100:10,6000,100:10,7000,100:10,6000,100:10,6000,100:10,6000,100:10,7000,100:10,6000,100
diff --git a/server/src/test/resources/simulation_profiles/test_skewness_medium.profile b/server/src/test/resources/simulation_profiles/test_skewness_medium.profile
new file mode 100644
index 00000000..b44fcbb5
--- /dev/null
+++ b/server/src/test/resources/simulation_profiles/test_skewness_medium.profile
@@ -0,0 +1,4 @@
+client1,topic1,10000:10,8500,100:10,5500,100:10,5500,100:10,5500,100:10,8500,100:10,5500,100:10,5500,100:10,5500,100:10,8500,100
+client2,topic1,10000:10,5500,100:10,8500,100:10,5500,100:10,5500,100:10,5500,100:10,8500,100:10,5500,100:10,5500,100:10,5500,100
+client3,topic1,10000:10,5500,100:10,5500,100:10,8500,100:10,5500,100:10,5500,100:10,5500,100:10,8500,100:10,5500,100:10,5500,100
+client4,topic1,10000:10,5500,100:10,5500,100:10,5500,100:10,8500,100:10,5500,100:10,5500,100:10,5500,100:10,8500,100:10,5500,100
diff --git a/server/src/test/resources/simulation_profiles/test_skewness_very_high.profile b/server/src/test/resources/simulation_profiles/test_skewness_very_high.profile
new file mode 100644
index 00000000..f7524c1f
--- /dev/null
+++ b/server/src/test/resources/simulation_profiles/test_skewness_very_high.profile
@@ -0,0 +1,4 @@
+client1,topic1,10000:10,22600,100:10,800,100:10,800,100:10,800,100:10,22600,100:10,800,100:10,800,100:10,800,100:10,22600,100
+client2,topic1,10000:10,800,100:10,22600,100:10,800,100:10,800,100:10,800,100:10,22600,100:10,800,100:10,800,100:10,800,100
+client3,topic1,10000:10,800,100:10,800,100:10,22600,100:10,800,100:10,800,100:10,800,100:10,22600,100:10,800,100:10,800,100
+client4,topic1,10000:10,800,100:10,800,100:10,800,100:10,22600,100:10,800,100:10,800,100:10,800,100:10,22600,100:10,800,100
diff --git a/settings.gradle b/settings.gradle
index 0be2edd1..3382085f 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -8,5 +8,5 @@
*/
rootProject.name = 'varadhi'
-include('entities', 'spi', 'common', 'core', 'messaging', 'pulsar', 'consumer', 'controller', 'authz', 'server')
+include('entities', 'spi', 'common', 'core', 'messaging', 'pulsar', 'consumer', 'controller', 'authz', 'server', 'rate-limiter')
diff --git a/setup/helm/varadhi/values/server_values.yaml b/setup/helm/varadhi/values/server_values.yaml
new file mode 100644
index 00000000..5a7444a0
--- /dev/null
+++ b/setup/helm/varadhi/values/server_values.yaml
@@ -0,0 +1,240 @@
+# Default values for varadhi.
+# This is a YAML-formatted file.
+# Declare variables to be passed into your templates.
+
+clusterDomain: cluster.local
+
+deployment:
+ name: server
+ replicaCount: 3
+ autoscaling:
+ enabled: false
+ minReplicas: 1
+ maxReplicas: 100
+ targetCPUUtilizationPercentage: 80
+ podAnnotations: { }
+ podSecurityContext: { }
+ terminationGracePeriodSeconds: 5
+ dnsSettings:
+ probes:
+ securityContext:
+ runAsNonRoot: true
+ runAsUser: 10222
+ capabilities:
+ add:
+ - NET_ADMIN
+ resources:
+ requests:
+ memory: 500Mi
+ cpu: 500m
+ limits:
+ memory: 500Mi
+ cpu: 500m
+ service:
+ annotations:
+ frontendPort: 8888
+
+zkDeployment:
+ enabled: false
+ name: zookeeper
+ replicaCount: 1
+ updateStrategy:
+ type: RollingUpdate
+ podManagementPolicy: Parallel
+ restartPodsOnConfigMapChange: false
+ annotations:
+ nodeSelector:
+ tolerations:
+ affinity:
+ anti_affinity: false
+ anti_affinity_topology_key: kubernetes.io/hostname
+ type: requiredDuringSchedulingIgnoredDuringExecution
+ gracePeriod: 5
+ podSecurityContext:
+ ports:
+ http: 8000
+ client: 2181
+ follower: 2888
+ leaderElection: 3888
+ probe:
+ liveness:
+ enabled: true
+ failureThreshold: 10
+ initialDelaySeconds: 20
+ periodSeconds: 30
+ timeoutSeconds: 30
+ readiness:
+ enabled: true
+ failureThreshold: 10
+ initialDelaySeconds: 20
+ periodSeconds: 30
+ timeoutSeconds: 30
+ startup:
+ enabled: false
+ failureThreshold: 30
+ initialDelaySeconds: 20
+ periodSeconds: 30
+ timeoutSeconds: 30
+ volumes:
+ # use a persistent volume or emptyDir
+ persistence: true
+ ## If you already have an existent storage class and want to reuse it, you can specify its name with the option below
+ ##
+ storageClassName: local-path
+ #
+ ## Instead if you want to create a new storage class define it below
+ ## If left undefined no storage class will be defined along with PVC
+ ##
+ storageClass:
+ # type: pd-ssd
+ # fsType: xfs
+ # provisioner: kubernetes.io/gce-pd
+ selector: { }
+ data:
+ name: data
+ size: 10Mi
+ dataLog:
+ name: datalog
+ size: 10Mi
+ conf:
+ name: zk-conf
+ scripts:
+ name: zk-scripts
+ service:
+ annotations:
+ configMapName: zookeeper
+ scriptsConfigMapName: zookeeper-scripts
+ logLevel: INFO
+ configData:
+ tickTime: 2000
+ globalOutstandingLimit: 5000
+ clientPort: 2181
+ maxClientCnxns: 10000
+ initLimit: 150
+ syncLimit: 15
+ skipACL: yes
+ purgeInterval: 24
+ snapRetainCount: 10
+ peerType: "participant"
+
+images:
+ pullPolicy: IfNotPresent
+ appImage:
+ registry: jfrog.fkinternal.com
+ repository: varadhi-oss/varadhi
+ tag: test
+ zookeeper:
+ registry: docker.io
+ repository: zookeeper
+ tag: 3.7.0
+
+
+#############################################################################
+# Config Settings for respective Varadhi components and their dependencies
+############################################################################
+
+####################################################
+# Varadhi app -- Server (rest api + producer) config
+####################################################
+varadhi:
+ app:
+ name: server
+ configMapName: server
+ restOptions:
+ deployedRegion: "default"
+ defaultOrg: "default"
+ defaultTeam: "public"
+ defaultProject: "public"
+ producerOptions:
+ producerCacheBuilderSpec: ""
+ topicCacheBuilderSpec: ""
+ metricEnabled: true
+ vertxOptions:
+ eventLoopPoolSize: 1
+ workerPoolSize: 1
+ internalBlockingPoolSize: 1
+ verticleDeploymentOptions:
+ instances: 1
+ worker: false
+ workerPoolSize: 2
+ deliveryOptions:
+ timeoutMs: 1000
+ tracingPolicy: "PROPAGATE"
+ httpServerOptions:
+ port: 18488
+ alpnVersions: [ "HTTP_1_1", "HTTP_2" ]
+ decompressionSupported: false
+ useAlpn: true
+ authenticationEnabled: false
+ authentication:
+ jwt:
+ jwksUrl: "http://localhost:9090/default/jwks"
+ options:
+ issuer: "http://localhost:9090/default"
+ ignoreExpiration: true
+ authorizationEnabled: true
+ authorization:
+ superUsers: [ "thanos" ]
+ providerClassName: "com.flipkart.varadhi.auth.DefaultAuthorizationProvider"
+ messagingStackOptions:
+ providerClassName: "com.flipkart.varadhi.pulsar.PulsarStackProvider"
+ metaStoreOptions:
+ providerClassName: "com.flipkart.varadhi.db.ZookeeperProvider"
+ featureFlags:
+ leanDeployment: false
+ member:
+ roles: [ "Server" ]
+ cpuCount: 1
+ nicMBps: 100
+
+#############################################
+# Varadhi Server logging config
+#############################################
+logging:
+ configMapName: logging
+ level: info
+
+#############################################
+# Metastore config -- zookeeper
+#############################################
+metastore:
+ configMapName: metastore
+ zookeeper:
+ connectUrl: "varadhi-zookeeper-0.varadhi-zookeeper.default.svc.cluster.local"
+ sessionTimeoutMs: 60000
+ connectTimeoutMs: 2000
+
+
+#############################################
+# Messaging stack config - pulsar
+#############################################
+messaging:
+ configMapName: messaging
+ pulsar:
+ adminOptions:
+ serviceHttpUrl: "http://host.docker.internal:8080"
+ connectionTimeoutMs: 2000
+ readTimeoutMs: 2000
+ requestTimeoutMs: 2000
+ clientOptions:
+ serviceUrl: "http://host.docker.internal:8080"
+ keepAliveIntervalSecs: 30 # Pulsar default (30 secs)
+ ioThreads: 2 # start with 2, might need further tuning (Pulsar default is 1).
+ connectionsPerBroker: 1 # Pulsar default is 1.
+ maxConcurrentLookupRequests: 5000 # Pulsar default is 5000.
+ maxLookupRequests: 50000 # Pulsar default is 50000.
+ maxLookupRedirects: 20 # Pulsar default is 20.
+ maxNumberOfRejectedRequestPerConnection: 50 # Pulsar default is 50.
+ memoryLimit: 0 # Pulsar default is no limit (0).
+ operationTimeoutMs: 2000 # keep it low to fail fast. Pulsar default is 30 Secs.
+ connectionTimeoutMs: 2000 # keep it low to fail fast. Pulsar default is 10 Secs.
+ lookupTimeoutMs: 10000 # 5 times operationTimeout
+ initialBackoffIntervalMs: 500 # some random value.
+ maxBackoffIntervalMs: 100 # half of operationTimeoutMs.
+
+#############################################
+# Authz provider config
+#############################################
+authzProvider:
+ configMapName: authorization
+