diff --git a/controller/src/main/java/com/flipkart/varadhi/controller/SuppressionManager.java b/controller/src/main/java/com/flipkart/varadhi/controller/SuppressionManager.java index 8b6b8867..6ded2716 100644 --- a/controller/src/main/java/com/flipkart/varadhi/controller/SuppressionManager.java +++ b/controller/src/main/java/com/flipkart/varadhi/controller/SuppressionManager.java @@ -20,10 +20,10 @@ @Slf4j public class SuppressionManager implements SuppressionService { - private Map> topicTrafficDataMap; // topic to client load info - private int windowSize; - private TopicLimitService topicLimitService; - private Ticker ticker; + private final Map> topicTrafficDataMap; // topic to client load info + private final int windowSize; + private final TopicLimitService topicLimitService; + private final Ticker ticker; public SuppressionManager(int windowSize, TopicLimitService topicLimitService, Ticker ticker) { this.topicTrafficDataMap = new ConcurrentHashMap<>(); @@ -89,19 +89,19 @@ public CompletableFuture addTrafficDataAsync(ClientLoadInfo inf SuppressionData suppressionData = new SuppressionData(); List> suppressionFactorFuture = new ArrayList<>(); - info.getTopicUsageList().forEach((trafficData) -> { - suppressionFactorFuture.add(this.addTrafficData( - info.getClientId(), - new TopicLoadInfo(info.getClientId(), info.getFrom(), info.getTo(), trafficData) - ).whenComplete((suppressionFactor, throwable) -> { - if (throwable != null) { - log.error("Error while calculating suppression factor", throwable); - return; - } - log.info("Topic: {}, SF thr-pt: {}", trafficData.getTopic(), suppressionFactor.getThroughputFactor()); - suppressionData.getSuppressionFactor().put(trafficData.getTopic(), suppressionFactor); - })); - }); + info.getTopicUsageList().forEach((trafficData) -> suppressionFactorFuture.add( + this.addTrafficData(info.getClientId(), + new TopicLoadInfo(info.getClientId(), info.getFrom(), info.getTo(), trafficData) + ).whenComplete((suppressionFactor, throwable) -> { + if (throwable != null) { + log.error("Error while calculating suppression factor", throwable); + return; + } + log.info("Topic: {}, SF thr-pt: {}", trafficData.getTopic(), + suppressionFactor.getThroughputFactor() + ); + suppressionData.getSuppressionFactor().put(trafficData.getTopic(), suppressionFactor); + }))); return FutureUtil.waitForAll(suppressionFactorFuture).thenApply(__ -> suppressionData); } diff --git a/controller/src/test/java/com/flipkart/varadhi/controller/qos/RateLimiterServiceTest.java b/controller/src/test/java/com/flipkart/varadhi/controller/qos/RateLimiterServiceTest.java index 71f700ce..4389a3dc 100644 --- a/controller/src/test/java/com/flipkart/varadhi/controller/qos/RateLimiterServiceTest.java +++ b/controller/src/test/java/com/flipkart/varadhi/controller/qos/RateLimiterServiceTest.java @@ -37,7 +37,6 @@ public class RateLimiterServiceTest { private static MeterRegistry meterRegistry; private SuppressionManager suppressionManager; - private MockTicker ticker; @Mock private TopicLimitService topicLimitService; @@ -59,7 +58,7 @@ public void setUpController() { MockitoAnnotations.openMocks(this); // Initialize mocks //setup controller side of things - ticker = new MockTicker(System.currentTimeMillis()); + MockTicker ticker = new MockTicker(System.currentTimeMillis()); suppressionManager = new SuppressionManager(10, topicLimitService, ticker); } @@ -171,9 +170,7 @@ public void clientLoadSimulation(String filePath) throws IOException, Interrupte // iterate through each client data assuming every client starts from the same point // consider duration and sum up the allowedBytes for each client and compare with the allowed throughput List> clientDataList = new ArrayList<>(); - clientLoadMap.forEach((client, testDataList) -> { - clientDataList.add(testDataList); - }); + clientLoadMap.forEach((client, testDataList) -> clientDataList.add(testDataList)); if (clientDataList.size() > 1) { List errors = calculateNormalisedError(clientDataList, topicThroughputQuotaMap.get(topic)); log.info("topic: {} errors: {}", topic, errors); @@ -206,14 +203,13 @@ private static List calculateNormalisedError(List> client break; } - for (int i = 0, inputSize = clientDataList.size(); i < inputSize; i++) { - List list = clientDataList.get(i); - if (!list.isEmpty()) { - TestData dv = list.get(0); + for (List clientTestData : clientDataList) { + if (!clientTestData.isEmpty()) { + TestData dv = clientTestData.get(0); sumAllowedBytes += (minDuration * dv.allowedBytes / dv.duration); sumProducedBytes += (minDuration * dv.generatedBytes / dv.duration); if (dv.duration == minDuration) { - list.remove(0); + clientTestData.remove(0); } else { dv.allowedBytes -= (minDuration * dv.allowedBytes / dv.duration); dv.generatedBytes -= (minDuration * dv.generatedBytes / dv.duration); diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/controller/TrafficDataHandler.java b/server/src/main/java/com/flipkart/varadhi/verticles/controller/TrafficDataHandler.java index bb17e02d..38b75a36 100644 --- a/server/src/main/java/com/flipkart/varadhi/verticles/controller/TrafficDataHandler.java +++ b/server/src/main/java/com/flipkart/varadhi/verticles/controller/TrafficDataHandler.java @@ -4,14 +4,6 @@ import com.flipkart.varadhi.cluster.messages.ResponseMessage; import com.flipkart.varadhi.controller.SuppressionManager; import com.flipkart.varadhi.qos.entity.ClientLoadInfo; -import com.flipkart.varadhi.qos.entity.SuppressionData; -import com.flipkart.varadhi.qos.entity.SuppressionFactor; -import com.flipkart.varadhi.qos.entity.TopicLoadInfo; -import com.flipkart.varadhi.utils.FutureUtil; - -import java.util.ArrayList; -import java.util.List; - import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CompletableFuture; diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/TrafficAggregator.java b/server/src/main/java/com/flipkart/varadhi/verticles/webserver/TrafficAggregator.java index e59b4888..0452581c 100644 --- a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/TrafficAggregator.java +++ b/server/src/main/java/com/flipkart/varadhi/verticles/webserver/TrafficAggregator.java @@ -87,9 +87,8 @@ private void sendTrafficUsageAndUpdateSuppressionFactor() { long currentTime = System.currentTimeMillis(); loadInfo.setTo(currentTime); // convert ConcurrentTopicData to TrafficData.list - topicTrafficMap.forEach((topic, data) -> { - loadInfo.getTopicUsageList().add(TrafficData.builder().topic(topic).bytesIn(data.bytesIn.sum()).rateIn(data.rateIn.sum()).build()); - }); + topicTrafficMap.forEach((topic, data) -> loadInfo.getTopicUsageList() + .add(TrafficData.builder().topic(topic).bytesIn(data.bytesIn.sum()).rateIn(data.rateIn.sum()).build())); log.info("Sending traffic data to controller: {}", loadInfo); // TODO(rl); simulate add delay for degradation; suppressionService.addTrafficDataAsync(loadInfo).whenComplete(this::handleSuppressionDataResponse); @@ -102,10 +101,10 @@ private void handleSuppressionDataResponse( if (throwable != null) { log.error("Error while receiving suppression data from controller", throwable); } else { - suppressionData.getSuppressionFactor().forEach((topic, suppressionFactor) -> { - rateLimiterService.updateSuppressionFactor( - topic, RateLimiterType.THROUGHPUT_CHECK, suppressionFactor.getThroughputFactor()); - }); + suppressionData.getSuppressionFactor().forEach( + (topic, suppressionFactor) -> rateLimiterService.updateSuppressionFactor(topic, + RateLimiterType.THROUGHPUT_CHECK, suppressionFactor.getThroughputFactor() + )); } }