diff --git a/controller/build.gradle b/controller/build.gradle index 13ede2c8..c71c4ced 100644 --- a/controller/build.gradle +++ b/controller/build.gradle @@ -8,6 +8,7 @@ dependencies { api(project(":core")) api(project(':rateLimiter')) + implementation(project(":common")) implementation("io.vertx:vertx-micrometer-metrics") implementation('com.fasterxml.jackson.core:jackson-databind') implementation("org.apache.commons:commons-lang3") diff --git a/controller/src/main/java/com/flipkart/varadhi/controller/SuppressionManager.java b/controller/src/main/java/com/flipkart/varadhi/controller/SuppressionManager.java index 66341aaa..8b6b8867 100644 --- a/controller/src/main/java/com/flipkart/varadhi/controller/SuppressionManager.java +++ b/controller/src/main/java/com/flipkart/varadhi/controller/SuppressionManager.java @@ -1,9 +1,15 @@ package com.flipkart.varadhi.controller; import com.flipkart.varadhi.qos.entity.ClientHistory; +import com.flipkart.varadhi.qos.entity.ClientLoadInfo; +import com.flipkart.varadhi.qos.entity.SuppressionData; import com.flipkart.varadhi.qos.entity.SuppressionFactor; import com.flipkart.varadhi.qos.entity.TopicLoadInfo; +import com.flipkart.varadhi.utils.FutureUtil; import com.google.common.base.Ticker; + +import java.util.ArrayList; + import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.mutable.MutableDouble; @@ -13,7 +19,7 @@ import java.util.concurrent.ConcurrentHashMap; @Slf4j -public class SuppressionManager { +public class SuppressionManager implements SuppressionService { private Map> topicTrafficDataMap; // topic to client load info private int windowSize; private TopicLimitService topicLimitService; @@ -77,4 +83,26 @@ private boolean isExpired(long time) { return (ticker.read() - windowSize * 1000L) > time; } + @Override + public CompletableFuture addTrafficDataAsync(ClientLoadInfo info) { + + SuppressionData suppressionData = new SuppressionData(); + List> suppressionFactorFuture = new ArrayList<>(); + + info.getTopicUsageList().forEach((trafficData) -> { + suppressionFactorFuture.add(this.addTrafficData( + info.getClientId(), + new TopicLoadInfo(info.getClientId(), info.getFrom(), info.getTo(), trafficData) + ).whenComplete((suppressionFactor, throwable) -> { + if (throwable != null) { + log.error("Error while calculating suppression factor", throwable); + return; + } + log.info("Topic: {}, SF thr-pt: {}", trafficData.getTopic(), suppressionFactor.getThroughputFactor()); + suppressionData.getSuppressionFactor().put(trafficData.getTopic(), suppressionFactor); + })); + }); + + return FutureUtil.waitForAll(suppressionFactorFuture).thenApply(__ -> suppressionData); + } } diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/TrafficSender.java b/controller/src/main/java/com/flipkart/varadhi/controller/SuppressionService.java similarity index 50% rename from server/src/main/java/com/flipkart/varadhi/verticles/webserver/TrafficSender.java rename to controller/src/main/java/com/flipkart/varadhi/controller/SuppressionService.java index 43f753d1..7e089f26 100644 --- a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/TrafficSender.java +++ b/controller/src/main/java/com/flipkart/varadhi/controller/SuppressionService.java @@ -1,10 +1,11 @@ -package com.flipkart.varadhi.verticles.webserver; +package com.flipkart.varadhi.controller; import com.flipkart.varadhi.qos.entity.ClientLoadInfo; import com.flipkart.varadhi.qos.entity.SuppressionData; import java.util.concurrent.CompletableFuture; -public interface TrafficSender { - CompletableFuture send(ClientLoadInfo loadInfo); +public interface SuppressionService { + CompletableFuture addTrafficDataAsync(ClientLoadInfo info); + } diff --git a/controller/src/test/java/com/flipkart/varadhi/controller/qos/RateLimiterServiceTest.java b/controller/src/test/java/com/flipkart/varadhi/controller/qos/RateLimiterServiceTest.java index 6226badb..71f700ce 100644 --- a/controller/src/test/java/com/flipkart/varadhi/controller/qos/RateLimiterServiceTest.java +++ b/controller/src/test/java/com/flipkart/varadhi/controller/qos/RateLimiterServiceTest.java @@ -5,14 +5,8 @@ import com.flipkart.varadhi.config.AppConfiguration; import com.flipkart.varadhi.controller.SuppressionManager; import com.flipkart.varadhi.controller.TopicLimitService; -import com.flipkart.varadhi.qos.entity.ClientLoadInfo; -import com.flipkart.varadhi.qos.entity.SuppressionData; -import com.flipkart.varadhi.qos.entity.SuppressionFactor; -import com.flipkart.varadhi.qos.entity.TopicLoadInfo; -import com.flipkart.varadhi.utils.FutureUtil; import com.flipkart.varadhi.utils.HostUtils; import com.flipkart.varadhi.verticles.webserver.RateLimiterService; -import com.flipkart.varadhi.verticles.webserver.TrafficSender; import com.google.common.util.concurrent.RateLimiter; import io.micrometer.core.instrument.MeterRegistry; import lombok.Getter; @@ -30,7 +24,6 @@ import java.io.IOException; import java.net.UnknownHostException; import java.util.*; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -42,7 +35,6 @@ @Slf4j public class RateLimiterServiceTest { - // private RateLimiterService rateLimiterService; private static MeterRegistry meterRegistry; private SuppressionManager suppressionManager; private MockTicker ticker; @@ -50,16 +42,20 @@ public class RateLimiterServiceTest { private TopicLimitService topicLimitService; @BeforeAll + public static void setup() throws UnknownHostException { + HostUtils.initHostUtils(); + setupMetrics(); + } + public static void setupMetrics() { String[] args = {"src/test/resources/testConfiguration.yml"}; AppConfiguration configuration = readConfiguration(args); -// CoreServices services = new CoreServices(configuration); CoreServices.ObservabilityStack observabilityStack = new CoreServices.ObservabilityStack(configuration); meterRegistry = observabilityStack.getMeterRegistry(); } @BeforeEach - public void setUpController() throws UnknownHostException { + public void setUpController() { MockitoAnnotations.openMocks(this); // Initialize mocks //setup controller side of things @@ -67,8 +63,17 @@ public void setUpController() throws UnknownHostException { suppressionManager = new SuppressionManager(10, topicLimitService, ticker); } - private static Stream provideRateLimitTestCSVPaths() { + private static Stream provideRateLimitTestFilePaths() { return Stream.of( +// Arguments.of("src/test/resources/simulation_profiles/test_load1.profile"), +// Arguments.of("src/test/resources/simulation_profiles/test_load1.profile"), +// Arguments.of("src/test/resources/simulation_profiles/test_load1.profile"), +// Arguments.of("src/test/resources/simulation_profiles/test_load1.profile"), +// Arguments.of("src/test/resources/simulation_profiles/test_load1.profile"), +// Arguments.of("src/test/resources/simulation_profiles/test_load1.profile"), +// Arguments.of("src/test/resources/simulation_profiles/test_load1.profile"), +// Arguments.of("src/test/resources/simulation_profiles/test_load1.profile"), +// Arguments.of("src/test/resources/simulation_profiles/test_load1.profile"), Arguments.of("src/test/resources/simulation_profiles/test_load1.profile") // todo(rl): fix these profiles // Arguments.of("src/test/resources/simulation_profiles/test_load.profile") @@ -78,7 +83,7 @@ private static Stream provideRateLimitTestCSVPaths() { } @ParameterizedTest - @MethodSource("provideRateLimitTestCSVPaths") + @MethodSource("provideRateLimitTestFilePaths") public void clientLoadSimulation(String filePath) throws IOException, InterruptedException { BufferedReader reader = new BufferedReader(new FileReader(filePath)); String line; @@ -97,22 +102,21 @@ public void clientLoadSimulation(String filePath) throws IOException, Interrupte String topic = parts[1]; int throughputQuota = Integer.parseInt(topicLoads[0]); topicThroughputQuotaMap.put(topic, throughputQuota); + log.info("Setting throughput for topic: {}, throughput: {}", topic, throughputQuota); when(topicLimitService.getThroughput(topic)).thenReturn(throughputQuota); // check if ratelimiterservice exists for a topic - clientRateLimiterMap.putIfAbsent(client, createRateLimiterSvc()); + clientRateLimiterMap.putIfAbsent(client, createRateLimiterSvc(client)); Runnable clientLoadSimulator = () -> { StringBuilder sb = new StringBuilder(); sb.append("Client: ").append(client).append(" topic: ").append(topic).append(": \n"); -// long lastAllowedBytes = 0, lastRejectedBytes = 0; for (int i = 1; i < topicLoads.length; i++) { String[] loadParts = topicLoads[i].split(","); long duration = Long.parseLong(loadParts[0]) * 1000; // Convert to milliseconds long throughput = Long.parseLong(loadParts[1]); long qps = Long.parseLong(loadParts[2]); -// long allowedByClient = Long.parseLong(loadParts[3]); long dataSize = throughput / qps; long startTime = System.currentTimeMillis(); long allowedBytes = 0, rejectedBytes = 0; @@ -135,8 +139,14 @@ public void clientLoadSimulation(String filePath) throws IOException, Interrupte .append("\n"); // store result data for error rate calculation later on - log.info("[] Client: {}, Topic: {}, Duration: {}, Allowed Bytes: {}, Rejected Bytes: {}", client, topic, duration, allowedBytes, rejectedBytes); - storeResult(topicClientLoadMap, client, topic, duration/1000, allowedBytes, allowedBytes + rejectedBytes); + log.info( + "[] Client: {}, Topic: {}, Duration: {}, Allowed Bytes: {}, Rejected Bytes: {}", client, + topic, duration, allowedBytes, rejectedBytes + ); + storeResult( + topicClientLoadMap, client, topic, duration / 1000, allowedBytes, + allowedBytes + rejectedBytes + ); } log.info(sb.toString()); }; @@ -164,8 +174,8 @@ public void clientLoadSimulation(String filePath) throws IOException, Interrupte clientLoadMap.forEach((client, testDataList) -> { clientDataList.add(testDataList); }); - if(clientDataList.size() > 1) { - List errors = calculateError(clientDataList, topicThroughputQuotaMap.get(topic)); + if (clientDataList.size() > 1) { + List errors = calculateNormalisedError(clientDataList, topicThroughputQuotaMap.get(topic)); log.info("topic: {} errors: {}", topic, errors); log.info("topic: {} absolute error: {}", topic, calculateAbsoluteError(errors)); log.info("Standard Deviation: {}", calculateStandardDeviation(errors)); @@ -173,7 +183,7 @@ public void clientLoadSimulation(String filePath) throws IOException, Interrupte }); } - private static List calculateError(List> clientDataList, long throguhput) { + private static List calculateNormalisedError(List> clientDataList, long throguhput) { List result = new ArrayList<>(); while (true) { @@ -208,7 +218,7 @@ private static List calculateError(List> clientDataList, lo dv.allowedBytes -= (minDuration * dv.allowedBytes / dv.duration); dv.generatedBytes -= (minDuration * dv.generatedBytes / dv.duration); dv.duration -= minDuration; - if(dv.allowedBytes <= 0) { + if (dv.allowedBytes <= 0) { throw new RuntimeException("Unexpected error"); // something failed here, find out why } @@ -219,23 +229,25 @@ private static List calculateError(List> clientDataList, lo result.add(new TestData(minDuration, sumAllowedBytes, sumProducedBytes)); } - List errors = new ArrayList<>(); -// 10KBPs 15KBps 7KBps +3 + List errors = new ArrayList<>(); for (TestData dv : result) { long maxBytes = Math.min(throguhput * dv.duration, dv.generatedBytes); long allowedBytes = dv.allowedBytes; - errors.add(Math.abs(allowedBytes - maxBytes)); - System.out.println("Duration: " + dv.duration + ", Allowed Bytes: " + dv.allowedBytes + ", Generated Bytes: " + dv.generatedBytes); + errors.add(((double) (Math.abs(allowedBytes - maxBytes)) / throguhput)); + log.info( + "Duration: {}, Allowed Bytes: {}, Generated Bytes: {}", dv.duration, dv.allowedBytes, + dv.generatedBytes + ); } return errors; } - private static double calculateAbsoluteError(List errors) { - return errors.stream().mapToLong(val -> val).sum(); + private static double calculateAbsoluteError(List errors) { + return errors.stream().mapToDouble(val -> val).sum(); } - private static double calculateStandardDeviation(List errors) { + private static double calculateStandardDeviation(List errors) { double mean = errors.stream().mapToDouble(val -> val).average().orElse(0.0); double variance = errors.stream().mapToDouble(val -> Math.pow(val - mean, 2)).average().orElse(0.0); return Math.sqrt(variance); @@ -261,39 +273,8 @@ private void storeResult( }); } - private RateLimiterService createRateLimiterSvc() throws UnknownHostException { - return new RateLimiterService( - new TrafficSender() { - @Override - public CompletableFuture send(ClientLoadInfo info) { - // receiver handler logic - SuppressionData suppressionData = new SuppressionData(); - long delta = System.currentTimeMillis() - info.getTo(); - log.info("Delta: {}ms", delta); - List> suppressionFactorFuture = new ArrayList<>(); - info.getTopicUsageList().forEach((trafficData) -> { - suppressionFactorFuture.add(suppressionManager.addTrafficData( - info.getClientId(), - new TopicLoadInfo(info.getClientId(), info.getFrom(), info.getTo(), trafficData) - ).whenComplete((suppressionFactor, throwable) -> { - if (throwable != null) { - log.error("Error while calculating suppression factor", throwable); - return; - } - log.info( - "Topic: {}, SF thr-pt: {}", trafficData.getTopic(), - suppressionFactor.getThroughputFactor() - ); - suppressionData.getSuppressionFactor().put(trafficData.getTopic(), suppressionFactor); - })); - }); - return FutureUtil.waitForAll(suppressionFactorFuture).thenApply(__ -> suppressionData); - } - }, - meterRegistry, - 1, - HostUtils.getHostName() - ); + private RateLimiterService createRateLimiterSvc(String clientId) throws UnknownHostException { + return new RateLimiterService(info -> suppressionManager.addTrafficDataAsync(info), meterRegistry, 1, clientId); } @Getter diff --git a/controller/src/test/resources/simulation_profiles/test_load2.profile b/controller/src/test/resources/simulation_profiles/test_load2.profile new file mode 100644 index 00000000..2f633598 --- /dev/null +++ b/controller/src/test/resources/simulation_profiles/test_load2.profile @@ -0,0 +1,4 @@ +client1,topic1,10000:10,2000,100:10,3000,100:10,4000,100:10,5000,100:10,100000,100 +client2,topic1,10000:10,2000,100:10,3000,100:10,4000,100:10,5000,100:10,100000,100 +client3,topic1,10000:10,2000,100:10,3000,100:10,4000,100:10,5000,100:10,100000,100 +client4,topic1,10000:10,2000,100:10,3000,100:10,4000,100:10,5000,100:10,100000,100 diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/controller/TrafficDataHandler.java b/server/src/main/java/com/flipkart/varadhi/verticles/controller/TrafficDataHandler.java index 87addb85..bb17e02d 100644 --- a/server/src/main/java/com/flipkart/varadhi/verticles/controller/TrafficDataHandler.java +++ b/server/src/main/java/com/flipkart/varadhi/verticles/controller/TrafficDataHandler.java @@ -25,28 +25,8 @@ public TrafficDataHandler(SuppressionManager suppressionManager) { } public CompletableFuture handle(ClusterMessage message) { - ClientLoadInfo info = message.getData(ClientLoadInfo.class); - SuppressionData suppressionData = new SuppressionData(); - long delta = System.currentTimeMillis() - info.getTo(); - log.info("Delta: {}ms", delta); - - List> suppressionFactorFuture = new ArrayList<>(); - - info.getTopicUsageList().forEach((trafficData) -> { - suppressionFactorFuture.add(suppressionManager.addTrafficData( - info.getClientId(), - new TopicLoadInfo(info.getClientId(), info.getFrom(), info.getTo(), trafficData) - ).whenComplete((suppressionFactor, throwable) -> { - if (throwable != null) { - log.error("Error while calculating suppression factor", throwable); - return; - } - log.info("Topic: {}, SF thr-pt: {}", trafficData.getTopic(), suppressionFactor.getThroughputFactor()); - suppressionData.getSuppressionFactor().put(trafficData.getTopic(), suppressionFactor); - })); - }); - - return FutureUtil.waitForAll(suppressionFactorFuture).thenApply(__ -> message.getResponseMessage(suppressionData)); + return suppressionManager.addTrafficDataAsync(message.getData(ClientLoadInfo.class)) + .thenApply(message::getResponseMessage); } } diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/RateLimiterService.java b/server/src/main/java/com/flipkart/varadhi/verticles/webserver/RateLimiterService.java index c68ba2c3..fbfeebad 100644 --- a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/RateLimiterService.java +++ b/server/src/main/java/com/flipkart/varadhi/verticles/webserver/RateLimiterService.java @@ -1,5 +1,6 @@ package com.flipkart.varadhi.verticles.webserver; +import com.flipkart.varadhi.controller.SuppressionService; import com.flipkart.varadhi.qos.RateLimiter; import com.flipkart.varadhi.qos.TopicRateLimiter; import com.flipkart.varadhi.qos.entity.RateLimiterType; @@ -29,13 +30,13 @@ public class RateLimiterService {// todo(rl): should be an interface for update private final MeterRegistry meterRegistry; - public RateLimiterService(TrafficSender trafficSender, MeterRegistry meterRegistry, int frequency, String clientId) + public RateLimiterService(SuppressionService suppressionService, MeterRegistry meterRegistry, int frequency, String clientId) throws UnknownHostException { topicRateLimiters = new HashMap<>(); trafficAggregator = new TrafficAggregator( clientId, frequency, - trafficSender, + suppressionService, this, Executors.newScheduledThreadPool(1) ); diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/TrafficAggregator.java b/server/src/main/java/com/flipkart/varadhi/verticles/webserver/TrafficAggregator.java index 21613c3d..e59b4888 100644 --- a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/TrafficAggregator.java +++ b/server/src/main/java/com/flipkart/varadhi/verticles/webserver/TrafficAggregator.java @@ -1,5 +1,6 @@ package com.flipkart.varadhi.verticles.webserver; +import com.flipkart.varadhi.controller.SuppressionService; import com.flipkart.varadhi.qos.entity.ClientLoadInfo; import com.flipkart.varadhi.qos.entity.RateLimiterType; import com.flipkart.varadhi.qos.entity.SuppressionData; @@ -24,7 +25,7 @@ public class TrafficAggregator { private final ClientLoadInfo loadInfo; private final int frequency; private final ScheduledExecutorService scheduledExecutorService; - private final TrafficSender trafficSender; + private final SuppressionService suppressionService; private final RateLimiterService rateLimiterService; private final Map topicTrafficMap; @@ -42,12 +43,12 @@ public ConcurrentTopicData(String topic) { } public TrafficAggregator( - String clientId, int frequency, TrafficSender trafficSender, RateLimiterService rateLimiterService, + String clientId, int frequency, SuppressionService suppressionService, RateLimiterService rateLimiterService, ScheduledExecutorService scheduledExecutorService ) { this.frequency = frequency; this.scheduledExecutorService = scheduledExecutorService; - this.trafficSender = trafficSender; + this.suppressionService = suppressionService; this.rateLimiterService = rateLimiterService; this.loadInfo = new ClientLoadInfo(clientId, 0,0, new ArrayList<>()); this.topicTrafficMap = new ConcurrentHashMap<>(); @@ -91,7 +92,7 @@ private void sendTrafficUsageAndUpdateSuppressionFactor() { }); log.info("Sending traffic data to controller: {}", loadInfo); // TODO(rl); simulate add delay for degradation; - trafficSender.send(loadInfo).whenComplete(this::handleSuppressionDataResponse); + suppressionService.addTrafficDataAsync(loadInfo).whenComplete(this::handleSuppressionDataResponse); resetData(currentTime); } diff --git a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerVerticle.java b/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerVerticle.java index bb80108c..45271d80 100644 --- a/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerVerticle.java +++ b/server/src/main/java/com/flipkart/varadhi/verticles/webserver/WebServerVerticle.java @@ -8,6 +8,7 @@ import com.flipkart.varadhi.cluster.VaradhiClusterManager; import com.flipkart.varadhi.cluster.messages.ClusterMessage; import com.flipkart.varadhi.config.AppConfiguration; +import com.flipkart.varadhi.controller.SuppressionService; import com.flipkart.varadhi.core.cluster.ControllerApi; import com.flipkart.varadhi.entities.StorageTopic; import com.flipkart.varadhi.entities.TopicCapacityPolicy; @@ -139,10 +140,10 @@ private void setupEntityServices() { subscriptionService = new SubscriptionService(shardProvisioner, controllerApiProxy, metaStore); try { // use host address as clientId for now. - rateLimiterService = new RateLimiterService(new TrafficSender() { + rateLimiterService = new RateLimiterService(new SuppressionService() { final MessageExchange exchange = clusterManager.getExchange(vertx); @Override - public CompletableFuture send(ClientLoadInfo loadInfo) { + public CompletableFuture addTrafficDataAsync(ClientLoadInfo loadInfo) { ClusterMessage msg = ClusterMessage.of(loadInfo); return exchange.request(ROUTE_CONTROLLER, "collect", msg) .thenApply(rm -> rm.getResponse(SuppressionData.class));