Skip to content

Commit

Permalink
normalize error and decouple get SF call
Browse files Browse the repository at this point in the history
  • Loading branch information
iosdev747 committed Oct 29, 2024
1 parent e800581 commit ab7dd60
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 95 deletions.
1 change: 1 addition & 0 deletions controller/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies {
api(project(":core"))
api(project(':rateLimiter'))

implementation(project(":common"))
implementation("io.vertx:vertx-micrometer-metrics")
implementation('com.fasterxml.jackson.core:jackson-databind')
implementation("org.apache.commons:commons-lang3")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package com.flipkart.varadhi.controller;

import com.flipkart.varadhi.qos.entity.ClientHistory;
import com.flipkart.varadhi.qos.entity.ClientLoadInfo;
import com.flipkart.varadhi.qos.entity.SuppressionData;
import com.flipkart.varadhi.qos.entity.SuppressionFactor;
import com.flipkart.varadhi.qos.entity.TopicLoadInfo;
import com.flipkart.varadhi.utils.FutureUtil;
import com.google.common.base.Ticker;

import java.util.ArrayList;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableDouble;

Expand All @@ -13,7 +19,7 @@
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class SuppressionManager {
public class SuppressionManager implements SuppressionService {
private Map<String, ClientHistory<TopicLoadInfo>> topicTrafficDataMap; // topic to client load info
private int windowSize;
private TopicLimitService topicLimitService;
Expand Down Expand Up @@ -77,4 +83,26 @@ private boolean isExpired(long time) {
return (ticker.read() - windowSize * 1000L) > time;
}

@Override
public CompletableFuture<SuppressionData> addTrafficDataAsync(ClientLoadInfo info) {

SuppressionData suppressionData = new SuppressionData();
List<CompletableFuture<SuppressionFactor>> suppressionFactorFuture = new ArrayList<>();

info.getTopicUsageList().forEach((trafficData) -> {
suppressionFactorFuture.add(this.addTrafficData(
info.getClientId(),
new TopicLoadInfo(info.getClientId(), info.getFrom(), info.getTo(), trafficData)
).whenComplete((suppressionFactor, throwable) -> {
if (throwable != null) {
log.error("Error while calculating suppression factor", throwable);
return;
}
log.info("Topic: {}, SF thr-pt: {}", trafficData.getTopic(), suppressionFactor.getThroughputFactor());
suppressionData.getSuppressionFactor().put(trafficData.getTopic(), suppressionFactor);
}));
});

return FutureUtil.waitForAll(suppressionFactorFuture).thenApply(__ -> suppressionData);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.flipkart.varadhi.verticles.webserver;
package com.flipkart.varadhi.controller;

import com.flipkart.varadhi.qos.entity.ClientLoadInfo;
import com.flipkart.varadhi.qos.entity.SuppressionData;

import java.util.concurrent.CompletableFuture;

public interface TrafficSender {
CompletableFuture<SuppressionData> send(ClientLoadInfo loadInfo);
public interface SuppressionService {
CompletableFuture<SuppressionData> addTrafficDataAsync(ClientLoadInfo info);

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,8 @@
import com.flipkart.varadhi.config.AppConfiguration;
import com.flipkart.varadhi.controller.SuppressionManager;
import com.flipkart.varadhi.controller.TopicLimitService;
import com.flipkart.varadhi.qos.entity.ClientLoadInfo;
import com.flipkart.varadhi.qos.entity.SuppressionData;
import com.flipkart.varadhi.qos.entity.SuppressionFactor;
import com.flipkart.varadhi.qos.entity.TopicLoadInfo;
import com.flipkart.varadhi.utils.FutureUtil;
import com.flipkart.varadhi.utils.HostUtils;
import com.flipkart.varadhi.verticles.webserver.RateLimiterService;
import com.flipkart.varadhi.verticles.webserver.TrafficSender;
import com.google.common.util.concurrent.RateLimiter;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.Getter;
Expand All @@ -30,7 +24,6 @@
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -42,33 +35,45 @@
@Slf4j
public class RateLimiterServiceTest {

// private RateLimiterService rateLimiterService;
private static MeterRegistry meterRegistry;
private SuppressionManager suppressionManager;
private MockTicker ticker;
@Mock
private TopicLimitService topicLimitService;

@BeforeAll
public static void setup() throws UnknownHostException {
HostUtils.initHostUtils();
setupMetrics();
}

public static void setupMetrics() {
String[] args = {"src/test/resources/testConfiguration.yml"};
AppConfiguration configuration = readConfiguration(args);
// CoreServices services = new CoreServices(configuration);
CoreServices.ObservabilityStack observabilityStack = new CoreServices.ObservabilityStack(configuration);
meterRegistry = observabilityStack.getMeterRegistry();
}

@BeforeEach
public void setUpController() throws UnknownHostException {
public void setUpController() {
MockitoAnnotations.openMocks(this); // Initialize mocks

//setup controller side of things
ticker = new MockTicker(System.currentTimeMillis());
suppressionManager = new SuppressionManager(10, topicLimitService, ticker);
}

private static Stream<Arguments> provideRateLimitTestCSVPaths() {
private static Stream<Arguments> provideRateLimitTestFilePaths() {
return Stream.of(
// Arguments.of("src/test/resources/simulation_profiles/test_load1.profile"),
// Arguments.of("src/test/resources/simulation_profiles/test_load1.profile"),
// Arguments.of("src/test/resources/simulation_profiles/test_load1.profile"),
// Arguments.of("src/test/resources/simulation_profiles/test_load1.profile"),
// Arguments.of("src/test/resources/simulation_profiles/test_load1.profile"),
// Arguments.of("src/test/resources/simulation_profiles/test_load1.profile"),
// Arguments.of("src/test/resources/simulation_profiles/test_load1.profile"),
// Arguments.of("src/test/resources/simulation_profiles/test_load1.profile"),
// Arguments.of("src/test/resources/simulation_profiles/test_load1.profile"),
Arguments.of("src/test/resources/simulation_profiles/test_load1.profile")
// todo(rl): fix these profiles
// Arguments.of("src/test/resources/simulation_profiles/test_load.profile")
Expand All @@ -78,7 +83,7 @@ private static Stream<Arguments> provideRateLimitTestCSVPaths() {
}

@ParameterizedTest
@MethodSource("provideRateLimitTestCSVPaths")
@MethodSource("provideRateLimitTestFilePaths")
public void clientLoadSimulation(String filePath) throws IOException, InterruptedException {
BufferedReader reader = new BufferedReader(new FileReader(filePath));
String line;
Expand All @@ -97,22 +102,21 @@ public void clientLoadSimulation(String filePath) throws IOException, Interrupte
String topic = parts[1];
int throughputQuota = Integer.parseInt(topicLoads[0]);
topicThroughputQuotaMap.put(topic, throughputQuota);
log.info("Setting throughput for topic: {}, throughput: {}", topic, throughputQuota);
when(topicLimitService.getThroughput(topic)).thenReturn(throughputQuota);

// check if ratelimiterservice exists for a topic
clientRateLimiterMap.putIfAbsent(client, createRateLimiterSvc());
clientRateLimiterMap.putIfAbsent(client, createRateLimiterSvc(client));

Runnable clientLoadSimulator = () -> {
StringBuilder sb = new StringBuilder();
sb.append("Client: ").append(client).append(" topic: ").append(topic).append(": \n");
// long lastAllowedBytes = 0, lastRejectedBytes = 0;

for (int i = 1; i < topicLoads.length; i++) {
String[] loadParts = topicLoads[i].split(",");
long duration = Long.parseLong(loadParts[0]) * 1000; // Convert to milliseconds
long throughput = Long.parseLong(loadParts[1]);
long qps = Long.parseLong(loadParts[2]);
// long allowedByClient = Long.parseLong(loadParts[3]);
long dataSize = throughput / qps;
long startTime = System.currentTimeMillis();
long allowedBytes = 0, rejectedBytes = 0;
Expand All @@ -135,8 +139,14 @@ public void clientLoadSimulation(String filePath) throws IOException, Interrupte
.append("\n");

// store result data for error rate calculation later on
log.info("[] Client: {}, Topic: {}, Duration: {}, Allowed Bytes: {}, Rejected Bytes: {}", client, topic, duration, allowedBytes, rejectedBytes);
storeResult(topicClientLoadMap, client, topic, duration/1000, allowedBytes, allowedBytes + rejectedBytes);
log.info(
"[] Client: {}, Topic: {}, Duration: {}, Allowed Bytes: {}, Rejected Bytes: {}", client,
topic, duration, allowedBytes, rejectedBytes
);
storeResult(
topicClientLoadMap, client, topic, duration / 1000, allowedBytes,
allowedBytes + rejectedBytes
);
}
log.info(sb.toString());
};
Expand Down Expand Up @@ -164,16 +174,16 @@ public void clientLoadSimulation(String filePath) throws IOException, Interrupte
clientLoadMap.forEach((client, testDataList) -> {
clientDataList.add(testDataList);
});
if(clientDataList.size() > 1) {
List<Long> errors = calculateError(clientDataList, topicThroughputQuotaMap.get(topic));
if (clientDataList.size() > 1) {
List<Double> errors = calculateNormalisedError(clientDataList, topicThroughputQuotaMap.get(topic));
log.info("topic: {} errors: {}", topic, errors);
log.info("topic: {} absolute error: {}", topic, calculateAbsoluteError(errors));
log.info("Standard Deviation: {}", calculateStandardDeviation(errors));
}
});
}

private static List<Long> calculateError(List<List<TestData>> clientDataList, long throguhput) {
private static List<Double> calculateNormalisedError(List<List<TestData>> clientDataList, long throguhput) {
List<TestData> result = new ArrayList<>();

while (true) {
Expand Down Expand Up @@ -208,7 +218,7 @@ private static List<Long> calculateError(List<List<TestData>> clientDataList, lo
dv.allowedBytes -= (minDuration * dv.allowedBytes / dv.duration);
dv.generatedBytes -= (minDuration * dv.generatedBytes / dv.duration);
dv.duration -= minDuration;
if(dv.allowedBytes <= 0) {
if (dv.allowedBytes <= 0) {
throw new RuntimeException("Unexpected error");
// something failed here, find out why
}
Expand All @@ -219,23 +229,25 @@ private static List<Long> calculateError(List<List<TestData>> clientDataList, lo
result.add(new TestData(minDuration, sumAllowedBytes, sumProducedBytes));
}

List<Long> errors = new ArrayList<>();
// 10KBPs 15KBps 7KBps +3
List<Double> errors = new ArrayList<>();
for (TestData dv : result) {
long maxBytes = Math.min(throguhput * dv.duration, dv.generatedBytes);
long allowedBytes = dv.allowedBytes;
errors.add(Math.abs(allowedBytes - maxBytes));
System.out.println("Duration: " + dv.duration + ", Allowed Bytes: " + dv.allowedBytes + ", Generated Bytes: " + dv.generatedBytes);
errors.add(((double) (Math.abs(allowedBytes - maxBytes)) / throguhput));
log.info(
"Duration: {}, Allowed Bytes: {}, Generated Bytes: {}", dv.duration, dv.allowedBytes,
dv.generatedBytes
);
}

return errors;
}

private static double calculateAbsoluteError(List<Long> errors) {
return errors.stream().mapToLong(val -> val).sum();
private static double calculateAbsoluteError(List<Double> errors) {
return errors.stream().mapToDouble(val -> val).sum();
}

private static double calculateStandardDeviation(List<Long> errors) {
private static double calculateStandardDeviation(List<Double> errors) {
double mean = errors.stream().mapToDouble(val -> val).average().orElse(0.0);
double variance = errors.stream().mapToDouble(val -> Math.pow(val - mean, 2)).average().orElse(0.0);
return Math.sqrt(variance);
Expand All @@ -261,39 +273,8 @@ private void storeResult(
});
}

private RateLimiterService createRateLimiterSvc() throws UnknownHostException {
return new RateLimiterService(
new TrafficSender() {
@Override
public CompletableFuture<SuppressionData> send(ClientLoadInfo info) {
// receiver handler logic
SuppressionData suppressionData = new SuppressionData();
long delta = System.currentTimeMillis() - info.getTo();
log.info("Delta: {}ms", delta);
List<CompletableFuture<SuppressionFactor>> suppressionFactorFuture = new ArrayList<>();
info.getTopicUsageList().forEach((trafficData) -> {
suppressionFactorFuture.add(suppressionManager.addTrafficData(
info.getClientId(),
new TopicLoadInfo(info.getClientId(), info.getFrom(), info.getTo(), trafficData)
).whenComplete((suppressionFactor, throwable) -> {
if (throwable != null) {
log.error("Error while calculating suppression factor", throwable);
return;
}
log.info(
"Topic: {}, SF thr-pt: {}", trafficData.getTopic(),
suppressionFactor.getThroughputFactor()
);
suppressionData.getSuppressionFactor().put(trafficData.getTopic(), suppressionFactor);
}));
});
return FutureUtil.waitForAll(suppressionFactorFuture).thenApply(__ -> suppressionData);
}
},
meterRegistry,
1,
HostUtils.getHostName()
);
private RateLimiterService createRateLimiterSvc(String clientId) throws UnknownHostException {
return new RateLimiterService(info -> suppressionManager.addTrafficDataAsync(info), meterRegistry, 1, clientId);
}

@Getter
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
client1,topic1,10000:10,2000,100:10,3000,100:10,4000,100:10,5000,100:10,100000,100
client2,topic1,10000:10,2000,100:10,3000,100:10,4000,100:10,5000,100:10,100000,100
client3,topic1,10000:10,2000,100:10,3000,100:10,4000,100:10,5000,100:10,100000,100
client4,topic1,10000:10,2000,100:10,3000,100:10,4000,100:10,5000,100:10,100000,100
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,8 @@ public TrafficDataHandler(SuppressionManager suppressionManager) {
}

public CompletableFuture<ResponseMessage> handle(ClusterMessage message) {
ClientLoadInfo info = message.getData(ClientLoadInfo.class);
SuppressionData suppressionData = new SuppressionData();
long delta = System.currentTimeMillis() - info.getTo();
log.info("Delta: {}ms", delta);

List<CompletableFuture<SuppressionFactor>> suppressionFactorFuture = new ArrayList<>();

info.getTopicUsageList().forEach((trafficData) -> {
suppressionFactorFuture.add(suppressionManager.addTrafficData(
info.getClientId(),
new TopicLoadInfo(info.getClientId(), info.getFrom(), info.getTo(), trafficData)
).whenComplete((suppressionFactor, throwable) -> {
if (throwable != null) {
log.error("Error while calculating suppression factor", throwable);
return;
}
log.info("Topic: {}, SF thr-pt: {}", trafficData.getTopic(), suppressionFactor.getThroughputFactor());
suppressionData.getSuppressionFactor().put(trafficData.getTopic(), suppressionFactor);
}));
});

return FutureUtil.waitForAll(suppressionFactorFuture).thenApply(__ -> message.getResponseMessage(suppressionData));
return suppressionManager.addTrafficDataAsync(message.getData(ClientLoadInfo.class))
.thenApply(message::getResponseMessage);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.flipkart.varadhi.verticles.webserver;

import com.flipkart.varadhi.controller.SuppressionService;
import com.flipkart.varadhi.qos.RateLimiter;
import com.flipkart.varadhi.qos.TopicRateLimiter;
import com.flipkart.varadhi.qos.entity.RateLimiterType;
Expand Down Expand Up @@ -29,13 +30,13 @@ public class RateLimiterService {// todo(rl): should be an interface for update
private final MeterRegistry meterRegistry;


public RateLimiterService(TrafficSender trafficSender, MeterRegistry meterRegistry, int frequency, String clientId)
public RateLimiterService(SuppressionService suppressionService, MeterRegistry meterRegistry, int frequency, String clientId)
throws UnknownHostException {
topicRateLimiters = new HashMap<>();
trafficAggregator = new TrafficAggregator(
clientId,
frequency,
trafficSender,
suppressionService,
this,
Executors.newScheduledThreadPool(1)
);
Expand Down
Loading

0 comments on commit ab7dd60

Please sign in to comment.