Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
iosdev747 committed Oct 29, 2024
1 parent ab7dd60 commit 08e2aa6
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

@Slf4j
public class SuppressionManager implements SuppressionService {
private Map<String, ClientHistory<TopicLoadInfo>> topicTrafficDataMap; // topic to client load info
private int windowSize;
private TopicLimitService topicLimitService;
private Ticker ticker;
private final Map<String, ClientHistory<TopicLoadInfo>> topicTrafficDataMap; // topic to client load info
private final int windowSize;
private final TopicLimitService topicLimitService;
private final Ticker ticker;

public SuppressionManager(int windowSize, TopicLimitService topicLimitService, Ticker ticker) {
this.topicTrafficDataMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -89,19 +89,19 @@ public CompletableFuture<SuppressionData> addTrafficDataAsync(ClientLoadInfo inf
SuppressionData suppressionData = new SuppressionData();
List<CompletableFuture<SuppressionFactor>> suppressionFactorFuture = new ArrayList<>();

info.getTopicUsageList().forEach((trafficData) -> {
suppressionFactorFuture.add(this.addTrafficData(
info.getClientId(),
new TopicLoadInfo(info.getClientId(), info.getFrom(), info.getTo(), trafficData)
).whenComplete((suppressionFactor, throwable) -> {
if (throwable != null) {
log.error("Error while calculating suppression factor", throwable);
return;
}
log.info("Topic: {}, SF thr-pt: {}", trafficData.getTopic(), suppressionFactor.getThroughputFactor());
suppressionData.getSuppressionFactor().put(trafficData.getTopic(), suppressionFactor);
}));
});
info.getTopicUsageList().forEach((trafficData) -> suppressionFactorFuture.add(
this.addTrafficData(info.getClientId(),
new TopicLoadInfo(info.getClientId(), info.getFrom(), info.getTo(), trafficData)
).whenComplete((suppressionFactor, throwable) -> {
if (throwable != null) {
log.error("Error while calculating suppression factor", throwable);
return;
}
log.info("Topic: {}, SF thr-pt: {}", trafficData.getTopic(),
suppressionFactor.getThroughputFactor()
);
suppressionData.getSuppressionFactor().put(trafficData.getTopic(), suppressionFactor);
})));

return FutureUtil.waitForAll(suppressionFactorFuture).thenApply(__ -> suppressionData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class RateLimiterServiceTest {

private static MeterRegistry meterRegistry;
private SuppressionManager suppressionManager;
private MockTicker ticker;
@Mock
private TopicLimitService topicLimitService;

Expand All @@ -59,7 +58,7 @@ public void setUpController() {
MockitoAnnotations.openMocks(this); // Initialize mocks

//setup controller side of things
ticker = new MockTicker(System.currentTimeMillis());
MockTicker ticker = new MockTicker(System.currentTimeMillis());
suppressionManager = new SuppressionManager(10, topicLimitService, ticker);
}

Expand Down Expand Up @@ -171,9 +170,7 @@ public void clientLoadSimulation(String filePath) throws IOException, Interrupte
// iterate through each client data assuming every client starts from the same point
// consider duration and sum up the allowedBytes for each client and compare with the allowed throughput
List<List<TestData>> clientDataList = new ArrayList<>();
clientLoadMap.forEach((client, testDataList) -> {
clientDataList.add(testDataList);
});
clientLoadMap.forEach((client, testDataList) -> clientDataList.add(testDataList));
if (clientDataList.size() > 1) {
List<Double> errors = calculateNormalisedError(clientDataList, topicThroughputQuotaMap.get(topic));
log.info("topic: {} errors: {}", topic, errors);
Expand Down Expand Up @@ -206,14 +203,13 @@ private static List<Double> calculateNormalisedError(List<List<TestData>> client
break;
}

for (int i = 0, inputSize = clientDataList.size(); i < inputSize; i++) {
List<TestData> list = clientDataList.get(i);
if (!list.isEmpty()) {
TestData dv = list.get(0);
for (List<TestData> clientTestData : clientDataList) {
if (!clientTestData.isEmpty()) {
TestData dv = clientTestData.get(0);
sumAllowedBytes += (minDuration * dv.allowedBytes / dv.duration);
sumProducedBytes += (minDuration * dv.generatedBytes / dv.duration);
if (dv.duration == minDuration) {
list.remove(0);
clientTestData.remove(0);
} else {
dv.allowedBytes -= (minDuration * dv.allowedBytes / dv.duration);
dv.generatedBytes -= (minDuration * dv.generatedBytes / dv.duration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,6 @@
import com.flipkart.varadhi.cluster.messages.ResponseMessage;
import com.flipkart.varadhi.controller.SuppressionManager;
import com.flipkart.varadhi.qos.entity.ClientLoadInfo;
import com.flipkart.varadhi.qos.entity.SuppressionData;
import com.flipkart.varadhi.qos.entity.SuppressionFactor;
import com.flipkart.varadhi.qos.entity.TopicLoadInfo;
import com.flipkart.varadhi.utils.FutureUtil;

import java.util.ArrayList;
import java.util.List;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ private void sendTrafficUsageAndUpdateSuppressionFactor() {
long currentTime = System.currentTimeMillis();
loadInfo.setTo(currentTime);
// convert ConcurrentTopicData to TrafficData.list
topicTrafficMap.forEach((topic, data) -> {
loadInfo.getTopicUsageList().add(TrafficData.builder().topic(topic).bytesIn(data.bytesIn.sum()).rateIn(data.rateIn.sum()).build());
});
topicTrafficMap.forEach((topic, data) -> loadInfo.getTopicUsageList()
.add(TrafficData.builder().topic(topic).bytesIn(data.bytesIn.sum()).rateIn(data.rateIn.sum()).build()));
log.info("Sending traffic data to controller: {}", loadInfo);
// TODO(rl); simulate add delay for degradation;
suppressionService.addTrafficDataAsync(loadInfo).whenComplete(this::handleSuppressionDataResponse);
Expand All @@ -102,10 +101,10 @@ private void handleSuppressionDataResponse(
if (throwable != null) {
log.error("Error while receiving suppression data from controller", throwable);
} else {
suppressionData.getSuppressionFactor().forEach((topic, suppressionFactor) -> {
rateLimiterService.updateSuppressionFactor(
topic, RateLimiterType.THROUGHPUT_CHECK, suppressionFactor.getThroughputFactor());
});
suppressionData.getSuppressionFactor().forEach(
(topic, suppressionFactor) -> rateLimiterService.updateSuppressionFactor(topic,
RateLimiterType.THROUGHPUT_CHECK, suppressionFactor.getThroughputFactor()
));
}
}

Expand Down

0 comments on commit 08e2aa6

Please sign in to comment.